diff --git a/backend/app/core/queue.py b/backend/app/core/queue.py index 2aecf9e..f899a1a 100644 --- a/backend/app/core/queue.py +++ b/backend/app/core/queue.py @@ -1,35 +1,28 @@ -import json -import os -from typing import Any, Dict -import asyncio +# Route email jobs via Celery instead of raw RabbitMQ publishing +# Ensure Celery app is initialized so producers use correct broker credentials. +# Import has side effects (sets Celery default app), safe to keep at module level. +import app.celery_app # noqa: F401 -RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( - f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" - f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" - f"{os.getenv('RABBITMQ_HOST', 'localhost')}:" - f"{os.getenv('RABBITMQ_PORT', '5672')}" -) -QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") +# Use a lazy proxy so _send_email_task is never None, even if Celery modules +# are not yet importable at import time (e.g., different working dir during startup). +class _LazySendTask: + def delay(self, to: str, subject: str, body: str): # type: ignore[no-untyped-def] + # Late import on first use + from app.workers.celery_tasks import send_email_fields as _send # type: ignore + return _send.delay(to, subject, body) + + +try: + # Try eager import for normal operation + from app.workers.celery_tasks import send_email_fields as _send_email_task # type: ignore +except ImportError: # pragma: no cover - fallback lazy import proxy + _send_email_task = _LazySendTask() # type: ignore -async def _publish_async(message: Dict[str, Any]) -> None: - import aio_pika - connection = await aio_pika.connect_robust(RABBITMQ_URL) - try: - channel = await connection.channel() - await channel.declare_queue(QUEUE_NAME, durable=True) - body = json.dumps(message).encode("utf-8") - await channel.default_exchange.publish( - aio_pika.Message(body=body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT), - routing_key=QUEUE_NAME, - ) - finally: - await connection.close() def enqueue_email(to: str, subject: str, body: str) -> None: - message = {"type": "email", "to": to, "subject": subject, "body": body} - try: - loop = asyncio.get_running_loop() - loop.create_task(_publish_async(message)) - except RuntimeError: - asyncio.run(_publish_async(message)) + """Enqueue an email job using Celery. + + Keeps the same public API as before. + """ + _send_email_task.delay(to, subject, body) diff --git a/backend/app/services/user_service.py b/backend/app/services/user_service.py index e92d9d3..1d3857e 100644 --- a/backend/app/services/user_service.py +++ b/backend/app/services/user_service.py @@ -7,8 +7,8 @@ from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin from fastapi_users.authentication import ( AuthenticationBackend, BearerTransport, - JWTStrategy, ) +from fastapi_users.authentication.strategy.jwt import JWTStrategy from fastapi_users.db import SQLAlchemyUserDatabase from app.models.user import User @@ -47,7 +47,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]): ) try: enqueue_email(to=user.email, subject=subject, body=body) - except Exception: + except Exception as e: print("[Email Fallback] To:", user.email) print("[Email Fallback] Subject:", subject) print("[Email Fallback] Body:\n", body) diff --git a/backend/app/workers/__main__.py b/backend/app/workers/__main__.py deleted file mode 100644 index e69de29..0000000 diff --git a/backend/app/workers/email_handler.py b/backend/app/workers/email_handler.py deleted file mode 100644 index cad8797..0000000 --- a/backend/app/workers/email_handler.py +++ /dev/null @@ -1,29 +0,0 @@ -import asyncio -from typing import Any, Dict - -# Import decorator and logger from the worker so handlers can register themselves -from .queue_worker import register_task_handler, logger - - -@register_task_handler("email") -async def handle_email_task(payload: Dict[str, Any]) -> None: - """Handle 'email' tasks dispatched by the queue worker. - - Expected payload schema: - { - "type": "email", - "to": "recipient@example.com", - "subject": "Subject text", - "body": "Email body text" - } - """ - to = payload.get("to") - subject = payload.get("subject") - body = payload.get("body") - if not (to and subject and body): - logger.error("Email task missing fields. Payload: %s", payload) - return - - # Placeholder for real email sending logic - await asyncio.sleep(0) # yield control, simulate async work - logger.info("Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body)) diff --git a/backend/app/workers/queue_worker.py b/backend/app/workers/queue_worker.py deleted file mode 100644 index abd769f..0000000 --- a/backend/app/workers/queue_worker.py +++ /dev/null @@ -1,167 +0,0 @@ -import asyncio -import json -import logging -import os -import signal -import socket -from contextlib import suppress -from typing import Any, Awaitable, Callable, Dict - -RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( - f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" - f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" - f"{os.getenv('RABBITMQ_HOST', 'localhost')}:" - f"{os.getenv('RABBITMQ_PORT', '5672')}" -) -QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") - -logger = logging.getLogger("queue_worker") -logger.propagate = False # prevent double logging -if not logger.handlers: # do not duplicate handlers - _handler = logging.StreamHandler() - _formatter = logging.Formatter( - fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - _handler.setFormatter(_formatter) - logger.addHandler(_handler) -logger.setLevel(os.getenv("WORKER_LOG_LEVEL", "INFO").upper()) - -TaskHandler = Callable[[Dict[str, Any]], Awaitable[None]] -_TASK_HANDLERS: Dict[str, TaskHandler] = {} - - -def register_task_handler(task_type: str) -> Callable[[TaskHandler], TaskHandler]: - def decorator(func: TaskHandler) -> TaskHandler: - _TASK_HANDLERS[task_type] = func - logger.debug("Registered task handler for type='%s' -> %s", task_type, func.__name__) - return func - - return decorator - - -async def _dispatch(message_dict: Dict[str, Any]) -> None: - task_type = message_dict.get("type") - if not task_type: - logger.warning("Skipping message without 'type': %s", message_dict) - return - handler = _TASK_HANDLERS.get(task_type) - if not handler: - logger.warning("No handler for type='%s'. Message: %s", task_type, message_dict) - return - await handler(message_dict) - - -def _auto_import_handlers() -> None: - import importlib - import os - import pkgutil - - pkg_name = __package__ # e.g., 'backend.app.workers' when run as module - dir_path = os.path.dirname(__file__) - - print(f"Package: {pkg_name}, Dir: {dir_path}") - - # Discover handler module base names in the filesystem - handler_modules = [] - for finder, name, ispkg in pkgutil.iter_modules([dir_path]): - if not ispkg and name.endswith("_handler"): - handler_modules.append(name) - - if not handler_modules: - logger.info("No handler modules discovered in %s", dir_path) - return - - imported = 0 - - importlib.import_module(__package__) - for mod in handler_modules: - full_name = f"{__package__}.{mod}" - try: - importlib.import_module(full_name) - imported += 1 - except Exception as e: - logger.debug("Failed to import handler module %s: %s", full_name, e) - - if not imported: - logger.warning( - "Failed to import any handler modules. Ensure the worker is executed " - "as a module (e.g., 'python -m backend.app.workers.queue_worker') or as a script (e.g., 'python3 app/workers/queue_worker.py')." - ) - else: - logger.info("Imported %d handler module(s): %s", imported, ", ".join(handler_modules)) - - -async def _consume(loop: asyncio.AbstractEventLoop) -> None: - import aio_pika - - # Mask password in URL for logging - safe_url = RABBITMQ_URL - with suppress(Exception): - if "@" in RABBITMQ_URL and "://" in RABBITMQ_URL: - scheme, rest = RABBITMQ_URL.split("://", 1) - creds_host = rest.split("@") - if len(creds_host) == 2: - user_pass, host = creds_host - user = user_pass.split(":")[0] - safe_url = f"{scheme}://{user}:***@{host}" - - logger.info( - "Starting worker | pid=%s | host=%s | queue=%s | url=%s", - os.getpid(), socket.gethostname(), QUEUE_NAME, safe_url, - ) - - connection = await aio_pika.connect_robust(RABBITMQ_URL, loop=loop) - try: - channel = await connection.channel() - queue = await channel.declare_queue(QUEUE_NAME, durable=True) - logger.info("Waiting for messages on queue='%s' …", QUEUE_NAME) - - async with queue.iterator() as queue_iter: - async for message in queue_iter: - async with message.process(requeue=False): - try: - payload = json.loads(message.body.decode("utf-8")) - except Exception as e: - logger.exception("Failed to decode message: %s", e) - continue - try: - await _dispatch(payload) - except Exception: - logger.exception("Unhandled error while processing message: %s", payload) - # message is not requeued due to process(requeue=False) - finally: - await connection.close() - logger.info("Worker stopped") - - -async def main() -> None: - loop = asyncio.get_running_loop() - - stop_event = asyncio.Event() - - def _request_shutdown(signame: str) -> None: - logger.info("Received %s, shutting down gracefully…", signame) - stop_event.set() - - for sig in (signal.SIGINT, signal.SIGTERM): - with suppress(NotImplementedError): - loop.add_signal_handler(sig, _request_shutdown, sig.name) - - consumer = asyncio.create_task(_consume(loop)) - - # Wait until a stop signal arrives - await stop_event.wait() - - # Give the consumer some time to exit - consumer.cancel() - with suppress(asyncio.CancelledError): - await consumer - - -if __name__ == "__main__": - _auto_import_handlers() - try: - asyncio.run(main()) - except KeyboardInterrupt: - pass diff --git a/backend/requirements.txt b/backend/requirements.txt index 045371e..1f8e9bf 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -2,14 +2,20 @@ aio-pika==9.5.6 aiormq==6.8.1 aiosqlite==0.21.0 alembic==1.16.5 +amqp==5.3.1 annotated-types==0.7.0 anyio==4.11.0 argon2-cffi==23.1.0 argon2-cffi-bindings==25.1.0 asyncmy==0.2.9 bcrypt==4.3.0 +billiard==4.2.2 +celery==5.5.3 cffi==2.0.0 click==8.1.8 +click-didyoumean==0.3.1 +click-plugins==1.1.1.2 +click-repl==0.3.0 cryptography==46.0.1 dnspython==2.7.0 email_validator==2.2.0 @@ -21,11 +27,14 @@ greenlet==3.2.4 h11==0.16.0 httptools==0.6.4 idna==3.10 +kombu==5.5.4 makefun==1.16.0 Mako==1.3.10 MarkupSafe==3.0.2 multidict==6.6.4 +packaging==25.0 pamqp==3.3.0 +prompt_toolkit==3.0.52 propcache==0.3.2 pwdlib==0.2.1 pycparser==2.23 @@ -33,17 +42,22 @@ pydantic==2.11.9 pydantic_core==2.33.2 PyJWT==2.10.1 PyMySQL==1.1.2 +python-dateutil==2.9.0.post0 python-dotenv==1.1.1 python-multipart==0.0.20 PyYAML==6.0.2 +six==1.17.0 sniffio==1.3.1 SQLAlchemy==2.0.43 starlette==0.48.0 tomli==2.2.1 typing-inspection==0.4.1 typing_extensions==4.15.0 +tzdata==2025.2 uvicorn==0.37.0 uvloop==0.21.0 +vine==5.1.0 watchfiles==1.1.0 +wcwidth==0.2.14 websockets==15.0.1 yarl==1.20.1 diff --git a/deployment/app-demo-worker-deployment.yaml b/deployment/app-demo-worker-deployment.yaml index 8520408..fc842f7 100644 --- a/deployment/app-demo-worker-deployment.yaml +++ b/deployment/app-demo-worker-deployment.yaml @@ -17,8 +17,14 @@ spec: - image: lukastrkan/cc-app-demo@sha256:d320eefb9dee05dc0f0ec5a2ca90daae7ca8c2af0088dc6b88eee076486c0f3b name: app-demo-worker command: - - python3 -m - - app.workers.queue_worker + - celery + - -A + - app.celery_app + - worker + - -Q + - $(MAIL_QUEUE) + - --loglevel + - INFO env: - name: RABBITMQ_USERNAME value: demo-app