From 6d8b760a7daff0890ab9cfd5a69a65af7ba092bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Trkan?= Date: Thu, 2 Oct 2025 13:00:57 +0200 Subject: [PATCH] feat(infrastructure): update queue worker --- backend/app/workers/email_handler.py | 29 +++ backend/app/workers/queue_worker.py | 202 +++++++++++++++++++++ backend/worker/__init__.py | 0 backend/worker/email_worker.py | 56 ------ deployment/app-demo-worker-deployment.yaml | 2 +- 5 files changed, 232 insertions(+), 57 deletions(-) create mode 100644 backend/app/workers/email_handler.py create mode 100644 backend/app/workers/queue_worker.py delete mode 100644 backend/worker/__init__.py delete mode 100644 backend/worker/email_worker.py diff --git a/backend/app/workers/email_handler.py b/backend/app/workers/email_handler.py new file mode 100644 index 0000000..cad8797 --- /dev/null +++ b/backend/app/workers/email_handler.py @@ -0,0 +1,29 @@ +import asyncio +from typing import Any, Dict + +# Import decorator and logger from the worker so handlers can register themselves +from .queue_worker import register_task_handler, logger + + +@register_task_handler("email") +async def handle_email_task(payload: Dict[str, Any]) -> None: + """Handle 'email' tasks dispatched by the queue worker. + + Expected payload schema: + { + "type": "email", + "to": "recipient@example.com", + "subject": "Subject text", + "body": "Email body text" + } + """ + to = payload.get("to") + subject = payload.get("subject") + body = payload.get("body") + if not (to and subject and body): + logger.error("Email task missing fields. Payload: %s", payload) + return + + # Placeholder for real email sending logic + await asyncio.sleep(0) # yield control, simulate async work + logger.info("Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body)) diff --git a/backend/app/workers/queue_worker.py b/backend/app/workers/queue_worker.py new file mode 100644 index 0000000..6dbb7ad --- /dev/null +++ b/backend/app/workers/queue_worker.py @@ -0,0 +1,202 @@ +import asyncio +import json +import logging +import os +import signal +import socket +from contextlib import suppress +from typing import Any, Awaitable, Callable, Dict + +# Build RabbitMQ URL consistent with core.queue +RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( + f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" + f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" + f"{os.getenv('RABBITMQ_HOST', 'localhost')}:" + f"{os.getenv('RABBITMQ_PORT', '5672')}" +) +QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") + +# Logging configuration +logger = logging.getLogger("queue_worker") +logger.propagate = False # prevent double logging via root logger +if not logger.handlers: # guard against multiple imports creating duplicate handlers + _handler = logging.StreamHandler() + _formatter = logging.Formatter( + fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + _handler.setFormatter(_formatter) + logger.addHandler(_handler) +logger.setLevel(os.getenv("WORKER_LOG_LEVEL", "INFO").upper()) + +# Task handler registry +TaskHandler = Callable[[Dict[str, Any]], Awaitable[None]] +_TASK_HANDLERS: Dict[str, TaskHandler] = {} + +def register_task_handler(task_type: str) -> Callable[[TaskHandler], TaskHandler]: + def decorator(func: TaskHandler) -> TaskHandler: + _TASK_HANDLERS[task_type] = func + logger.debug("Registered task handler for type='%s' -> %s", task_type, func.__name__) + return func + return decorator + +async def _dispatch(message_dict: Dict[str, Any]) -> None: + task_type = message_dict.get("type") + if not task_type: + logger.warning("Skipping message without 'type': %s", message_dict) + return + handler = _TASK_HANDLERS.get(task_type) + if not handler: + logger.warning("No handler for type='%s'. Message: %s", task_type, message_dict) + return + await handler(message_dict) + +# Auto-load all handler modules in this package so decorators can register +# without manual imports. Any file matching '*_handler.py' will be imported. + +def _auto_import_handlers() -> None: + import importlib + import os + import pkgutil + import sys + + pkg_name = __package__ # e.g., 'backend.app.workers' when run as module + dir_path = os.path.dirname(__file__) + + # Discover handler module base names in the filesystem + handler_modules = [] + for finder, name, ispkg in pkgutil.iter_modules([dir_path]): + if not ispkg and name.endswith("_handler"): + handler_modules.append(name) + + if not handler_modules: + logger.info("No handler modules discovered in %s", dir_path) + return + + imported = 0 + + # Prefer importing using the current package name so relative imports in + # handler modules (e.g., from .queue_worker import register_task_handler) + # continue to work. + bases_to_try = [] + if pkg_name: + bases_to_try.append(pkg_name) + + # If executed as a script (no package), make 'app' importable and alias this + # module as 'app.workers.queue_worker' so that handler relative imports work + if not pkg_name: + backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # .../backend + if backend_dir not in sys.path: + sys.path.insert(0, backend_dir) + try: + importlib.import_module("app.workers") + # Alias the current module under the package name to satisfy + # 'from .queue_worker import ...' inside handlers without re-importing + sys.modules.setdefault("app.workers.queue_worker", sys.modules[__name__]) + except Exception as e: + logger.debug("Failed to prepare package context for handlers: %s", e) + # Ensure we try importing under 'app.workers' + if "app.workers" not in bases_to_try: + bases_to_try.append("app.workers") + + # Also try common alternative base when the whole project is a package + for base in ("backend.app.workers", "app.workers"): + if base not in bases_to_try: + bases_to_try.append(base) + + for base in bases_to_try: + try: + importlib.import_module(base) + except Exception: + continue + for mod in handler_modules: + full_name = f"{base}.{mod}" + try: + importlib.import_module(full_name) + imported += 1 + except Exception as e: + logger.debug("Failed to import handler module %s: %s", full_name, e) + if imported: + break + + if not imported: + logger.warning( + "Failed to import any handler modules. Ensure the worker is executed " + "as a module (e.g., 'python -m backend.app.workers.queue_worker')." + ) + else: + logger.info("Imported %d handler module(s): %s", imported, ", ".join(handler_modules)) + +# Trigger auto-discovery of handlers at import time +_auto_import_handlers() + +async def _consume(loop: asyncio.AbstractEventLoop) -> None: + import aio_pika + + # Mask password in URL for logging + safe_url = RABBITMQ_URL + with suppress(Exception): + if "@" in RABBITMQ_URL and "://" in RABBITMQ_URL: + scheme, rest = RABBITMQ_URL.split("://", 1) + creds_host = rest.split("@") + if len(creds_host) == 2: + user_pass, host = creds_host + user = user_pass.split(":")[0] + safe_url = f"{scheme}://{user}:***@{host}" + + logger.info( + "Starting worker | pid=%s | host=%s | queue=%s | url=%s", + os.getpid(), socket.gethostname(), QUEUE_NAME, safe_url, + ) + + connection = await aio_pika.connect_robust(RABBITMQ_URL, loop=loop) + try: + channel = await connection.channel() + queue = await channel.declare_queue(QUEUE_NAME, durable=True) + logger.info("Waiting for messages on queue='%s' …", QUEUE_NAME) + + async with queue.iterator() as queue_iter: + async for message in queue_iter: + async with message.process(requeue=False): + try: + payload = json.loads(message.body.decode("utf-8")) + except Exception as e: + logger.exception("Failed to decode message: %s", e) + continue + try: + await _dispatch(payload) + except Exception: + logger.exception("Unhandled error while processing message: %s", payload) + # message is not requeued due to process(requeue=False) + finally: + await connection.close() + logger.info("Worker stopped") + +async def main() -> None: + loop = asyncio.get_running_loop() + + stop_event = asyncio.Event() + + def _request_shutdown(signame: str) -> None: + logger.info("Received %s, shutting down gracefully…", signame) + stop_event.set() + + for sig in (signal.SIGINT, signal.SIGTERM): + with suppress(NotImplementedError): + loop.add_signal_handler(sig, _request_shutdown, sig.name) + + consumer = asyncio.create_task(_consume(loop)) + + # Wait until a stop signal arrives + await stop_event.wait() + + # Give the consumer some time to exit + consumer.cancel() + with suppress(asyncio.CancelledError): + await consumer + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + pass diff --git a/backend/worker/__init__.py b/backend/worker/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/backend/worker/email_worker.py b/backend/worker/email_worker.py deleted file mode 100644 index 5f927b6..0000000 --- a/backend/worker/email_worker.py +++ /dev/null @@ -1,56 +0,0 @@ -import asyncio -import json -import os -from typing import Any, Dict - -RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( - f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" - f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" - f"{os.getenv('RABBITMQ_HOST', 'localhost')}:" - f"{os.getenv('RABBITMQ_PORT', '5672')}" -) -QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") - - -async def handle_message(message_body: bytes) -> None: - try: - data: Dict[str, Any] = json.loads(message_body.decode("utf-8")) - except Exception as e: - print(f"[email_worker] Failed to decode message: {e}") - return - - if data.get("type") != "email": - print(f"[email_worker] Unknown message type: {data}") - return - - to = data.get("to") - subject = data.get("subject") - body = data.get("body") - if not (to and subject and body): - print(f"[email_worker] Incomplete email message: {data}") - return - - try: - await send_email(to=to, subject=subject, body=body) - print(f"[email_worker] Sent email to {to}") - except Exception as e: - print(f"[email_worker] Error sending email to {to}: {e}") - - -async def main() -> None: - import aio_pika - - print(f"[email_worker] Connecting to RabbitMQ at {RABBITMQ_URL}") - connection = await aio_pika.connect_robust(RABBITMQ_URL) - channel = await connection.channel() - queue = await channel.declare_queue(QUEUE_NAME, durable=True) - print(f"[email_worker] Waiting for messages in queue '{QUEUE_NAME}' ...") - - async with queue.iterator() as queue_iter: - async for message in queue_iter: - async with message.process(requeue=False): - await handle_message(message.body) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/deployment/app-demo-worker-deployment.yaml b/deployment/app-demo-worker-deployment.yaml index f463a9c..a81cc7a 100644 --- a/deployment/app-demo-worker-deployment.yaml +++ b/deployment/app-demo-worker-deployment.yaml @@ -18,7 +18,7 @@ spec: name: app-demo-worker command: - python3 - - worker/email_worker.py + - app/workers/queue_worker.py env: - name: RABBITMQ_USERNAME value: demo-app