import logging import asyncio import os import smtplib from email.message import EmailMessage from celery import shared_task import app.services.bank_scraper logger = logging.getLogger("celery_tasks") if not logger.handlers: _h = logging.StreamHandler() logger.addHandler(_h) logger.setLevel(logging.INFO) def run_coro(coro) -> None: """Run an async coroutine in a fresh event loop without using run_until_complete. Primary strategy runs in a new loop in the current thread. If that fails due to debugger patches (e.g., Bad file descriptor from pydevd_nest_asyncio), fall back to running in a dedicated thread with its own event loop. """ import threading def _cleanup_loop(loop): try: pending = [t for t in asyncio.all_tasks(loop) if not t.done()] for t in pending: t.cancel() if pending: loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) except Exception: pass finally: try: loop.close() finally: asyncio.set_event_loop(None) # First attempt: Run in current thread with a fresh event loop try: loop = asyncio.get_event_loop_policy().new_event_loop() try: asyncio.set_event_loop(loop) task = loop.create_task(coro) task.add_done_callback(lambda _t: loop.stop()) loop.run_forever() exc = task.exception() if exc: raise exc return finally: _cleanup_loop(loop) except OSError as e: logger.warning("run_coro primary strategy failed (%s). Falling back to thread runner.", e) except Exception: # For any other unexpected errors, try thread fallback as well logger.exception("run_coro primary strategy raised; attempting thread fallback") # Fallback: Run in a dedicated thread with its own event loop error = {"exc": None} def _thread_target(): loop = asyncio.new_event_loop() try: asyncio.set_event_loop(loop) task = loop.create_task(coro) task.add_done_callback(lambda _t: loop.stop()) loop.run_forever() exc = task.exception() if exc: error["exc"] = exc finally: _cleanup_loop(loop) th = threading.Thread(target=_thread_target, name="celery-async-runner", daemon=True) th.start() th.join() if error["exc"] is not None: raise error["exc"] @shared_task(name="workers.send_email") def send_email(to: str, subject: str, body: str) -> None: if not (to and subject and body): logger.error("Email task missing fields. to=%r subject=%r body_len=%r", to, subject, len(body) if body else 0) return host = os.getenv("SMTP_HOST") if not host: logger.error("SMTP_HOST is not configured; cannot send email") return # Configuration port = int(os.getenv("SMTP_PORT", "25")) username = os.getenv("SMTP_USERNAME") password = os.getenv("SMTP_PASSWORD") use_tls = os.getenv("SMTP_USE_TLS", "0").lower() in {"1", "true", "yes"} use_ssl = os.getenv("SMTP_USE_SSL", "0").lower() in {"1", "true", "yes"} timeout = int(os.getenv("SMTP_TIMEOUT", "10")) mail_from = os.getenv("SMTP_FROM") or username or "noreply@localhost" # Build message msg = EmailMessage() msg["To"] = to msg["From"] = mail_from msg["Subject"] = subject msg.set_content(body) try: if use_ssl: with smtplib.SMTP_SSL(host=host, port=port, timeout=timeout) as smtp: if username and password: smtp.login(username, password) smtp.send_message(msg) else: with smtplib.SMTP(host=host, port=port, timeout=timeout) as smtp: # STARTTLS if requested if use_tls: smtp.starttls() if username and password: smtp.login(username, password) smtp.send_message(msg) logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body)) except Exception: logger.exception("Failed to send email via SMTP to=%s subject=%s host=%s port=%s tls=%s ssl=%s", to, subject, host, port, use_tls, use_ssl) @shared_task(name="workers.load_transactions") def load_transactions(user_id: str) -> None: if not user_id: logger.error("Load transactions task missing user_id.") return run_coro(app.services.bank_scraper.aload_ceska_sporitelna_transactions(user_id)) # Placeholder for real transaction loading logic logger.info("[Celery] Transactions loaded for user_id=%s", user_id) @shared_task(name="workers.load_all_transactions") def load_all_transactions() -> None: logger.info("[Celery] Starting load_all_transactions") run_coro(app.services.bank_scraper.aload_all_ceska_sporitelna_transactions()) logger.info("[Celery] Finished load_all_transactions")