mirror of
https://github.com/dat515-2025/Group-8.git
synced 2026-03-22 15:12:08 +01:00
203 lines
7.3 KiB
Python
203 lines
7.3 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import socket
|
|
from contextlib import suppress
|
|
from typing import Any, Awaitable, Callable, Dict
|
|
|
|
# Build RabbitMQ URL consistent with core.queue
|
|
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
|
|
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
|
|
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
|
|
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
|
|
f"{os.getenv('RABBITMQ_PORT', '5672')}"
|
|
)
|
|
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
|
|
|
|
# Logging configuration
|
|
logger = logging.getLogger("queue_worker")
|
|
logger.propagate = False # prevent double logging via root logger
|
|
if not logger.handlers: # guard against multiple imports creating duplicate handlers
|
|
_handler = logging.StreamHandler()
|
|
_formatter = logging.Formatter(
|
|
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
_handler.setFormatter(_formatter)
|
|
logger.addHandler(_handler)
|
|
logger.setLevel(os.getenv("WORKER_LOG_LEVEL", "INFO").upper())
|
|
|
|
# Task handler registry
|
|
TaskHandler = Callable[[Dict[str, Any]], Awaitable[None]]
|
|
_TASK_HANDLERS: Dict[str, TaskHandler] = {}
|
|
|
|
def register_task_handler(task_type: str) -> Callable[[TaskHandler], TaskHandler]:
|
|
def decorator(func: TaskHandler) -> TaskHandler:
|
|
_TASK_HANDLERS[task_type] = func
|
|
logger.debug("Registered task handler for type='%s' -> %s", task_type, func.__name__)
|
|
return func
|
|
return decorator
|
|
|
|
async def _dispatch(message_dict: Dict[str, Any]) -> None:
|
|
task_type = message_dict.get("type")
|
|
if not task_type:
|
|
logger.warning("Skipping message without 'type': %s", message_dict)
|
|
return
|
|
handler = _TASK_HANDLERS.get(task_type)
|
|
if not handler:
|
|
logger.warning("No handler for type='%s'. Message: %s", task_type, message_dict)
|
|
return
|
|
await handler(message_dict)
|
|
|
|
# Auto-load all handler modules in this package so decorators can register
|
|
# without manual imports. Any file matching '*_handler.py' will be imported.
|
|
|
|
def _auto_import_handlers() -> None:
|
|
import importlib
|
|
import os
|
|
import pkgutil
|
|
import sys
|
|
|
|
pkg_name = __package__ # e.g., 'backend.app.workers' when run as module
|
|
dir_path = os.path.dirname(__file__)
|
|
|
|
# Discover handler module base names in the filesystem
|
|
handler_modules = []
|
|
for finder, name, ispkg in pkgutil.iter_modules([dir_path]):
|
|
if not ispkg and name.endswith("_handler"):
|
|
handler_modules.append(name)
|
|
|
|
if not handler_modules:
|
|
logger.info("No handler modules discovered in %s", dir_path)
|
|
return
|
|
|
|
imported = 0
|
|
|
|
# Prefer importing using the current package name so relative imports in
|
|
# handler modules (e.g., from .queue_worker import register_task_handler)
|
|
# continue to work.
|
|
bases_to_try = []
|
|
if pkg_name:
|
|
bases_to_try.append(pkg_name)
|
|
|
|
# If executed as a script (no package), make 'app' importable and alias this
|
|
# module as 'app.workers.queue_worker' so that handler relative imports work
|
|
if not pkg_name:
|
|
backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # .../backend
|
|
if backend_dir not in sys.path:
|
|
sys.path.insert(0, backend_dir)
|
|
try:
|
|
importlib.import_module("app.workers")
|
|
# Alias the current module under the package name to satisfy
|
|
# 'from .queue_worker import ...' inside handlers without re-importing
|
|
sys.modules.setdefault("app.workers.queue_worker", sys.modules[__name__])
|
|
except Exception as e:
|
|
logger.debug("Failed to prepare package context for handlers: %s", e)
|
|
# Ensure we try importing under 'app.workers'
|
|
if "app.workers" not in bases_to_try:
|
|
bases_to_try.append("app.workers")
|
|
|
|
# Also try common alternative base when the whole project is a package
|
|
for base in ("backend.app.workers", "app.workers"):
|
|
if base not in bases_to_try:
|
|
bases_to_try.append(base)
|
|
|
|
for base in bases_to_try:
|
|
try:
|
|
importlib.import_module(base)
|
|
except Exception:
|
|
continue
|
|
for mod in handler_modules:
|
|
full_name = f"{base}.{mod}"
|
|
try:
|
|
importlib.import_module(full_name)
|
|
imported += 1
|
|
except Exception as e:
|
|
logger.debug("Failed to import handler module %s: %s", full_name, e)
|
|
if imported:
|
|
break
|
|
|
|
if not imported:
|
|
logger.warning(
|
|
"Failed to import any handler modules. Ensure the worker is executed "
|
|
"as a module (e.g., 'python -m backend.app.workers.queue_worker')."
|
|
)
|
|
else:
|
|
logger.info("Imported %d handler module(s): %s", imported, ", ".join(handler_modules))
|
|
|
|
# Trigger auto-discovery of handlers at import time
|
|
_auto_import_handlers()
|
|
|
|
async def _consume(loop: asyncio.AbstractEventLoop) -> None:
|
|
import aio_pika
|
|
|
|
# Mask password in URL for logging
|
|
safe_url = RABBITMQ_URL
|
|
with suppress(Exception):
|
|
if "@" in RABBITMQ_URL and "://" in RABBITMQ_URL:
|
|
scheme, rest = RABBITMQ_URL.split("://", 1)
|
|
creds_host = rest.split("@")
|
|
if len(creds_host) == 2:
|
|
user_pass, host = creds_host
|
|
user = user_pass.split(":")[0]
|
|
safe_url = f"{scheme}://{user}:***@{host}"
|
|
|
|
logger.info(
|
|
"Starting worker | pid=%s | host=%s | queue=%s | url=%s",
|
|
os.getpid(), socket.gethostname(), QUEUE_NAME, safe_url,
|
|
)
|
|
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URL, loop=loop)
|
|
try:
|
|
channel = await connection.channel()
|
|
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
|
|
logger.info("Waiting for messages on queue='%s' …", QUEUE_NAME)
|
|
|
|
async with queue.iterator() as queue_iter:
|
|
async for message in queue_iter:
|
|
async with message.process(requeue=False):
|
|
try:
|
|
payload = json.loads(message.body.decode("utf-8"))
|
|
except Exception as e:
|
|
logger.exception("Failed to decode message: %s", e)
|
|
continue
|
|
try:
|
|
await _dispatch(payload)
|
|
except Exception:
|
|
logger.exception("Unhandled error while processing message: %s", payload)
|
|
# message is not requeued due to process(requeue=False)
|
|
finally:
|
|
await connection.close()
|
|
logger.info("Worker stopped")
|
|
|
|
async def main() -> None:
|
|
loop = asyncio.get_running_loop()
|
|
|
|
stop_event = asyncio.Event()
|
|
|
|
def _request_shutdown(signame: str) -> None:
|
|
logger.info("Received %s, shutting down gracefully…", signame)
|
|
stop_event.set()
|
|
|
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
with suppress(NotImplementedError):
|
|
loop.add_signal_handler(sig, _request_shutdown, sig.name)
|
|
|
|
consumer = asyncio.create_task(_consume(loop))
|
|
|
|
# Wait until a stop signal arrives
|
|
await stop_event.wait()
|
|
|
|
# Give the consumer some time to exit
|
|
consumer.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await consumer
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
pass
|