mirror of
https://github.com/dat515-2025/Group-8.git
synced 2026-03-22 06:57:47 +01:00
108 lines
3.6 KiB
Python
108 lines
3.6 KiB
Python
import logging
|
|
import asyncio
|
|
|
|
from celery import shared_task
|
|
|
|
import app.services.bank_scraper
|
|
|
|
logger = logging.getLogger("celery_tasks")
|
|
if not logger.handlers:
|
|
_h = logging.StreamHandler()
|
|
logger.addHandler(_h)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
def run_coro(coro) -> None:
|
|
"""Run an async coroutine in a fresh event loop without using run_until_complete.
|
|
Primary strategy runs in a new loop in the current thread. If that fails due to
|
|
debugger patches (e.g., Bad file descriptor from pydevd_nest_asyncio), fall back
|
|
to running in a dedicated thread with its own event loop.
|
|
"""
|
|
import threading
|
|
|
|
def _cleanup_loop(loop):
|
|
try:
|
|
pending = [t for t in asyncio.all_tasks(loop) if not t.done()]
|
|
for t in pending:
|
|
t.cancel()
|
|
if pending:
|
|
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
try:
|
|
loop.close()
|
|
finally:
|
|
asyncio.set_event_loop(None)
|
|
|
|
# First attempt: Run in current thread with a fresh event loop
|
|
try:
|
|
loop = asyncio.get_event_loop_policy().new_event_loop()
|
|
try:
|
|
asyncio.set_event_loop(loop)
|
|
task = loop.create_task(coro)
|
|
task.add_done_callback(lambda _t: loop.stop())
|
|
loop.run_forever()
|
|
exc = task.exception()
|
|
if exc:
|
|
raise exc
|
|
return
|
|
finally:
|
|
_cleanup_loop(loop)
|
|
except OSError as e:
|
|
logger.warning("run_coro primary strategy failed (%s). Falling back to thread runner.", e)
|
|
except Exception:
|
|
# For any other unexpected errors, try thread fallback as well
|
|
logger.exception("run_coro primary strategy raised; attempting thread fallback")
|
|
|
|
# Fallback: Run in a dedicated thread with its own event loop
|
|
error = {"exc": None}
|
|
|
|
def _thread_target():
|
|
loop = asyncio.new_event_loop()
|
|
try:
|
|
asyncio.set_event_loop(loop)
|
|
task = loop.create_task(coro)
|
|
task.add_done_callback(lambda _t: loop.stop())
|
|
loop.run_forever()
|
|
exc = task.exception()
|
|
if exc:
|
|
error["exc"] = exc
|
|
finally:
|
|
_cleanup_loop(loop)
|
|
|
|
th = threading.Thread(target=_thread_target, name="celery-async-runner", daemon=True)
|
|
th.start()
|
|
th.join()
|
|
if error["exc"] is not None:
|
|
raise error["exc"]
|
|
|
|
|
|
@shared_task(name="workers.send_email")
|
|
def send_email(to: str, subject: str, body: str) -> None:
|
|
if not (to and subject and body):
|
|
logger.error("Email task missing fields. to=%r subject=%r body_len=%r", to, subject, len(body) if body else 0)
|
|
return
|
|
|
|
# Placeholder for real email sending logic
|
|
logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body))
|
|
|
|
|
|
@shared_task(name="workers.load_transactions")
|
|
def load_transactions(user_id: str) -> None:
|
|
if not user_id:
|
|
logger.error("Load transactions task missing user_id.")
|
|
return
|
|
|
|
run_coro(app.services.bank_scraper.aload_ceska_sporitelna_transactions(user_id))
|
|
|
|
# Placeholder for real transaction loading logic
|
|
logger.info("[Celery] Transactions loaded for user_id=%s", user_id)
|
|
|
|
|
|
@shared_task(name="workers.load_all_transactions")
|
|
def load_all_transactions() -> None:
|
|
logger.info("[Celery] Starting load_all_transactions")
|
|
run_coro(app.services.bank_scraper.aload_all_ceska_sporitelna_transactions())
|
|
logger.info("[Celery] Finished load_all_transactions")
|