import asyncio import json import logging import os import signal import socket from contextlib import suppress from typing import Any, Awaitable, Callable, Dict # Build RabbitMQ URL consistent with core.queue RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" f"{os.getenv('RABBITMQ_HOST', 'localhost')}:" f"{os.getenv('RABBITMQ_PORT', '5672')}" ) QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") # Logging configuration logger = logging.getLogger("queue_worker") logger.propagate = False # prevent double logging via root logger if not logger.handlers: # guard against multiple imports creating duplicate handlers _handler = logging.StreamHandler() _formatter = logging.Formatter( fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) _handler.setFormatter(_formatter) logger.addHandler(_handler) logger.setLevel(os.getenv("WORKER_LOG_LEVEL", "INFO").upper()) # Task handler registry TaskHandler = Callable[[Dict[str, Any]], Awaitable[None]] _TASK_HANDLERS: Dict[str, TaskHandler] = {} def register_task_handler(task_type: str) -> Callable[[TaskHandler], TaskHandler]: def decorator(func: TaskHandler) -> TaskHandler: _TASK_HANDLERS[task_type] = func logger.debug("Registered task handler for type='%s' -> %s", task_type, func.__name__) return func return decorator async def _dispatch(message_dict: Dict[str, Any]) -> None: task_type = message_dict.get("type") if not task_type: logger.warning("Skipping message without 'type': %s", message_dict) return handler = _TASK_HANDLERS.get(task_type) if not handler: logger.warning("No handler for type='%s'. Message: %s", task_type, message_dict) return await handler(message_dict) # Auto-load all handler modules in this package so decorators can register # without manual imports. Any file matching '*_handler.py' will be imported. def _auto_import_handlers() -> None: import importlib import os import pkgutil import sys pkg_name = __package__ # e.g., 'backend.app.workers' when run as module dir_path = os.path.dirname(__file__) # Discover handler module base names in the filesystem handler_modules = [] for finder, name, ispkg in pkgutil.iter_modules([dir_path]): if not ispkg and name.endswith("_handler"): handler_modules.append(name) if not handler_modules: logger.info("No handler modules discovered in %s", dir_path) return imported = 0 # Prefer importing using the current package name so relative imports in # handler modules (e.g., from .queue_worker import register_task_handler) # continue to work. bases_to_try = [] if pkg_name: bases_to_try.append(pkg_name) # If executed as a script (no package), make 'app' importable and alias this # module as 'app.workers.queue_worker' so that handler relative imports work if not pkg_name: backend_dir = os.path.abspath(os.path.join(__file__, '../../..')) # .../backend if backend_dir not in sys.path: sys.path.insert(0, backend_dir) try: importlib.import_module("app.workers") # Alias the current module under the package name to satisfy # 'from .queue_worker import ...' inside handlers without re-importing sys.modules.setdefault("app.workers.queue_worker", sys.modules[__name__]) except Exception as e: logger.debug("Failed to prepare package context for handlers: %s", e) # Ensure we try importing under 'app.workers' if "app.workers" not in bases_to_try: bases_to_try.append("app.workers") # Also try common alternative base when the whole project is a package for base in ("backend.app.workers", "app.workers"): if base not in bases_to_try: bases_to_try.append(base) for base in bases_to_try: try: importlib.import_module(base) except Exception: continue for mod in handler_modules: full_name = f"{base}.{mod}" try: importlib.import_module(full_name) imported += 1 except Exception as e: logger.debug("Failed to import handler module %s: %s", full_name, e) if imported: break if not imported: logger.warning( "Failed to import any handler modules. Ensure the worker is executed " "as a module (e.g., 'python -m backend.app.workers.queue_worker')." ) else: logger.info("Imported %d handler module(s): %s", imported, ", ".join(handler_modules)) # Trigger auto-discovery of handlers at import time _auto_import_handlers() async def _consume(loop: asyncio.AbstractEventLoop) -> None: import aio_pika # Mask password in URL for logging safe_url = RABBITMQ_URL with suppress(Exception): if "@" in RABBITMQ_URL and "://" in RABBITMQ_URL: scheme, rest = RABBITMQ_URL.split("://", 1) creds_host = rest.split("@") if len(creds_host) == 2: user_pass, host = creds_host user = user_pass.split(":")[0] safe_url = f"{scheme}://{user}:***@{host}" logger.info( "Starting worker | pid=%s | host=%s | queue=%s | url=%s", os.getpid(), socket.gethostname(), QUEUE_NAME, safe_url, ) connection = await aio_pika.connect_robust(RABBITMQ_URL, loop=loop) try: channel = await connection.channel() queue = await channel.declare_queue(QUEUE_NAME, durable=True) logger.info("Waiting for messages on queue='%s' …", QUEUE_NAME) async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(requeue=False): try: payload = json.loads(message.body.decode("utf-8")) except Exception as e: logger.exception("Failed to decode message: %s", e) continue try: await _dispatch(payload) except Exception: logger.exception("Unhandled error while processing message: %s", payload) # message is not requeued due to process(requeue=False) finally: await connection.close() logger.info("Worker stopped") async def main() -> None: loop = asyncio.get_running_loop() stop_event = asyncio.Event() def _request_shutdown(signame: str) -> None: logger.info("Received %s, shutting down gracefully…", signame) stop_event.set() for sig in (signal.SIGINT, signal.SIGTERM): with suppress(NotImplementedError): loop.add_signal_handler(sig, _request_shutdown, sig.name) consumer = asyncio.create_task(_consume(loop)) # Wait until a stop signal arrives await stop_event.wait() # Give the consumer some time to exit consumer.cancel() with suppress(asyncio.CancelledError): await consumer if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass