diff --git a/backend/app/workers/__main__.py b/backend/app/workers/__main__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/workers/queue_worker.py b/backend/app/workers/queue_worker.py index 48c7078..abd769f 100644 --- a/backend/app/workers/queue_worker.py +++ b/backend/app/workers/queue_worker.py @@ -7,7 +7,6 @@ import socket from contextlib import suppress from typing import Any, Awaitable, Callable, Dict -# Build RabbitMQ URL consistent with core.queue RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" @@ -16,10 +15,9 @@ RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( ) QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") -# Logging configuration logger = logging.getLogger("queue_worker") -logger.propagate = False # prevent double logging via root logger -if not logger.handlers: # guard against multiple imports creating duplicate handlers +logger.propagate = False # prevent double logging +if not logger.handlers: # do not duplicate handlers _handler = logging.StreamHandler() _formatter = logging.Formatter( fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", @@ -29,17 +27,19 @@ if not logger.handlers: # guard against multiple imports creating duplicate han logger.addHandler(_handler) logger.setLevel(os.getenv("WORKER_LOG_LEVEL", "INFO").upper()) -# Task handler registry TaskHandler = Callable[[Dict[str, Any]], Awaitable[None]] _TASK_HANDLERS: Dict[str, TaskHandler] = {} + def register_task_handler(task_type: str) -> Callable[[TaskHandler], TaskHandler]: def decorator(func: TaskHandler) -> TaskHandler: _TASK_HANDLERS[task_type] = func logger.debug("Registered task handler for type='%s' -> %s", task_type, func.__name__) return func + return decorator + async def _dispatch(message_dict: Dict[str, Any]) -> None: task_type = message_dict.get("type") if not task_type: @@ -51,18 +51,17 @@ async def _dispatch(message_dict: Dict[str, Any]) -> None: return await handler(message_dict) -# Auto-load all handler modules in this package so decorators can register -# without manual imports. Any file matching '*_handler.py' will be imported. def _auto_import_handlers() -> None: import importlib import os import pkgutil - import sys pkg_name = __package__ # e.g., 'backend.app.workers' when run as module dir_path = os.path.dirname(__file__) + print(f"Package: {pkg_name}, Dir: {dir_path}") + # Discover handler module base names in the filesystem handler_modules = [] for finder, name, ispkg in pkgutil.iter_modules([dir_path]): @@ -75,48 +74,14 @@ def _auto_import_handlers() -> None: imported = 0 - # Prefer importing using the current package name so relative imports in - # handler modules (e.g., from .queue_worker import register_task_handler) - # continue to work. - bases_to_try = [] - if pkg_name: - bases_to_try.append(pkg_name) - - # If executed as a script (no package), make 'app' importable so that handler modules - # can be imported using absolute imports. Handler modules must use absolute imports - # (e.g., 'from backend.app.workers.queue_worker import register_task_handler'). - if not pkg_name: - backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # .../backend - if backend_dir not in sys.path: - sys.path.insert(0, backend_dir) + importlib.import_module(__package__) + for mod in handler_modules: + full_name = f"{__package__}.{mod}" try: - importlib.import_module("app.workers") - sys.modules.setdefault("app.workers.queue_worker", sys.modules[__name__]) + importlib.import_module(full_name) + imported += 1 except Exception as e: - logger.debug("Failed to prepare package context for handlers: %s", e) - # Ensure we try importing under 'app.workers' - if "app.workers" not in bases_to_try: - bases_to_try.append("app.workers") - - # Also try common alternative base when the whole project is a package - for base in ("backend.app.workers", "app.workers"): - if base not in bases_to_try: - bases_to_try.append(base) - - for base in bases_to_try: - try: - importlib.import_module(base) - except Exception: - continue - for mod in handler_modules: - full_name = f"{base}.{mod}" - try: - importlib.import_module(full_name) - imported += 1 - except Exception as e: - logger.debug("Failed to import handler module %s: %s", full_name, e) - if imported: - break + logger.debug("Failed to import handler module %s: %s", full_name, e) if not imported: logger.warning( @@ -126,8 +91,6 @@ def _auto_import_handlers() -> None: else: logger.info("Imported %d handler module(s): %s", imported, ", ".join(handler_modules)) -# Trigger auto-discovery of handlers at import time -_auto_import_handlers() async def _consume(loop: asyncio.AbstractEventLoop) -> None: import aio_pika @@ -171,6 +134,7 @@ async def _consume(loop: asyncio.AbstractEventLoop) -> None: await connection.close() logger.info("Worker stopped") + async def main() -> None: loop = asyncio.get_running_loop() @@ -194,7 +158,9 @@ async def main() -> None: with suppress(asyncio.CancelledError): await consumer + if __name__ == "__main__": + _auto_import_handlers() try: asyncio.run(main()) except KeyboardInterrupt: diff --git a/deployment/app-demo-worker-deployment.yaml b/deployment/app-demo-worker-deployment.yaml index a81cc7a..8520408 100644 --- a/deployment/app-demo-worker-deployment.yaml +++ b/deployment/app-demo-worker-deployment.yaml @@ -17,8 +17,8 @@ spec: - image: lukastrkan/cc-app-demo@sha256:d320eefb9dee05dc0f0ec5a2ca90daae7ca8c2af0088dc6b88eee076486c0f3b name: app-demo-worker command: - - python3 - - app/workers/queue_worker.py + - python3 -m + - app.workers.queue_worker env: - name: RABBITMQ_USERNAME value: demo-app