feat(oauth): add csas connection, allow oauth from react

This commit is contained in:
2025-10-21 22:01:09 +02:00
parent be4a3b401a
commit 3ebf47e371
13 changed files with 509 additions and 20 deletions

View File

@@ -1,7 +1,10 @@
import logging
import asyncio
from celery import shared_task
import app.services.bank_scraper
logger = logging.getLogger("celery_tasks")
if not logger.handlers:
_h = logging.StreamHandler()
@@ -9,6 +12,72 @@ if not logger.handlers:
logger.setLevel(logging.INFO)
def run_coro(coro) -> None:
"""Run an async coroutine in a fresh event loop without using run_until_complete.
Primary strategy runs in a new loop in the current thread. If that fails due to
debugger patches (e.g., Bad file descriptor from pydevd_nest_asyncio), fall back
to running in a dedicated thread with its own event loop.
"""
import threading
def _cleanup_loop(loop):
try:
pending = [t for t in asyncio.all_tasks(loop) if not t.done()]
for t in pending:
t.cancel()
if pending:
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
except Exception:
pass
finally:
try:
loop.close()
finally:
asyncio.set_event_loop(None)
# First attempt: Run in current thread with a fresh event loop
try:
loop = asyncio.get_event_loop_policy().new_event_loop()
try:
asyncio.set_event_loop(loop)
task = loop.create_task(coro)
task.add_done_callback(lambda _t: loop.stop())
loop.run_forever()
exc = task.exception()
if exc:
raise exc
return
finally:
_cleanup_loop(loop)
except OSError as e:
logger.warning("run_coro primary strategy failed (%s). Falling back to thread runner.", e)
except Exception:
# For any other unexpected errors, try thread fallback as well
logger.exception("run_coro primary strategy raised; attempting thread fallback")
# Fallback: Run in a dedicated thread with its own event loop
error = {"exc": None}
def _thread_target():
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
task = loop.create_task(coro)
task.add_done_callback(lambda _t: loop.stop())
loop.run_forever()
exc = task.exception()
if exc:
error["exc"] = exc
finally:
_cleanup_loop(loop)
th = threading.Thread(target=_thread_target, name="celery-async-runner", daemon=True)
th.start()
th.join()
if error["exc"] is not None:
raise error["exc"]
@shared_task(name="workers.send_email")
def send_email(to: str, subject: str, body: str) -> None:
if not (to and subject and body):
@@ -17,3 +86,22 @@ def send_email(to: str, subject: str, body: str) -> None:
# Placeholder for real email sending logic
logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body))
@shared_task(name="workers.load_transactions")
def load_transactions(user_id: str) -> None:
if not user_id:
logger.error("Load transactions task missing user_id.")
return
run_coro(app.services.bank_scraper.aload_ceska_sporitelna_transactions(user_id))
# Placeholder for real transaction loading logic
logger.info("[Celery] Transactions loaded for user_id=%s", user_id)
@shared_task(name="workers.load_all_transactions")
def load_all_transactions() -> None:
logger.info("[Celery] Starting load_all_transactions")
run_coro(app.services.bank_scraper.aload_all_ceska_sporitelna_transactions())
logger.info("[Celery] Finished load_all_transactions")