mirror of
https://github.com/dat515-2025/Group-8.git
synced 2026-03-22 06:57:47 +01:00
feat(infrastructure): update queue worker
This commit is contained in:
0
backend/app/workers/__main__.py
Normal file
0
backend/app/workers/__main__.py
Normal file
@@ -7,7 +7,6 @@ import socket
|
|||||||
from contextlib import suppress
|
from contextlib import suppress
|
||||||
from typing import Any, Awaitable, Callable, Dict
|
from typing import Any, Awaitable, Callable, Dict
|
||||||
|
|
||||||
# Build RabbitMQ URL consistent with core.queue
|
|
||||||
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
|
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
|
||||||
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
|
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
|
||||||
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
|
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
|
||||||
@@ -16,10 +15,9 @@ RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
|
|||||||
)
|
)
|
||||||
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
|
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
|
||||||
|
|
||||||
# Logging configuration
|
|
||||||
logger = logging.getLogger("queue_worker")
|
logger = logging.getLogger("queue_worker")
|
||||||
logger.propagate = False # prevent double logging via root logger
|
logger.propagate = False # prevent double logging
|
||||||
if not logger.handlers: # guard against multiple imports creating duplicate handlers
|
if not logger.handlers: # do not duplicate handlers
|
||||||
_handler = logging.StreamHandler()
|
_handler = logging.StreamHandler()
|
||||||
_formatter = logging.Formatter(
|
_formatter = logging.Formatter(
|
||||||
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
|
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
|
||||||
@@ -29,17 +27,19 @@ if not logger.handlers: # guard against multiple imports creating duplicate han
|
|||||||
logger.addHandler(_handler)
|
logger.addHandler(_handler)
|
||||||
logger.setLevel(os.getenv("WORKER_LOG_LEVEL", "INFO").upper())
|
logger.setLevel(os.getenv("WORKER_LOG_LEVEL", "INFO").upper())
|
||||||
|
|
||||||
# Task handler registry
|
|
||||||
TaskHandler = Callable[[Dict[str, Any]], Awaitable[None]]
|
TaskHandler = Callable[[Dict[str, Any]], Awaitable[None]]
|
||||||
_TASK_HANDLERS: Dict[str, TaskHandler] = {}
|
_TASK_HANDLERS: Dict[str, TaskHandler] = {}
|
||||||
|
|
||||||
|
|
||||||
def register_task_handler(task_type: str) -> Callable[[TaskHandler], TaskHandler]:
|
def register_task_handler(task_type: str) -> Callable[[TaskHandler], TaskHandler]:
|
||||||
def decorator(func: TaskHandler) -> TaskHandler:
|
def decorator(func: TaskHandler) -> TaskHandler:
|
||||||
_TASK_HANDLERS[task_type] = func
|
_TASK_HANDLERS[task_type] = func
|
||||||
logger.debug("Registered task handler for type='%s' -> %s", task_type, func.__name__)
|
logger.debug("Registered task handler for type='%s' -> %s", task_type, func.__name__)
|
||||||
return func
|
return func
|
||||||
|
|
||||||
return decorator
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
async def _dispatch(message_dict: Dict[str, Any]) -> None:
|
async def _dispatch(message_dict: Dict[str, Any]) -> None:
|
||||||
task_type = message_dict.get("type")
|
task_type = message_dict.get("type")
|
||||||
if not task_type:
|
if not task_type:
|
||||||
@@ -51,18 +51,17 @@ async def _dispatch(message_dict: Dict[str, Any]) -> None:
|
|||||||
return
|
return
|
||||||
await handler(message_dict)
|
await handler(message_dict)
|
||||||
|
|
||||||
# Auto-load all handler modules in this package so decorators can register
|
|
||||||
# without manual imports. Any file matching '*_handler.py' will be imported.
|
|
||||||
|
|
||||||
def _auto_import_handlers() -> None:
|
def _auto_import_handlers() -> None:
|
||||||
import importlib
|
import importlib
|
||||||
import os
|
import os
|
||||||
import pkgutil
|
import pkgutil
|
||||||
import sys
|
|
||||||
|
|
||||||
pkg_name = __package__ # e.g., 'backend.app.workers' when run as module
|
pkg_name = __package__ # e.g., 'backend.app.workers' when run as module
|
||||||
dir_path = os.path.dirname(__file__)
|
dir_path = os.path.dirname(__file__)
|
||||||
|
|
||||||
|
print(f"Package: {pkg_name}, Dir: {dir_path}")
|
||||||
|
|
||||||
# Discover handler module base names in the filesystem
|
# Discover handler module base names in the filesystem
|
||||||
handler_modules = []
|
handler_modules = []
|
||||||
for finder, name, ispkg in pkgutil.iter_modules([dir_path]):
|
for finder, name, ispkg in pkgutil.iter_modules([dir_path]):
|
||||||
@@ -75,48 +74,14 @@ def _auto_import_handlers() -> None:
|
|||||||
|
|
||||||
imported = 0
|
imported = 0
|
||||||
|
|
||||||
# Prefer importing using the current package name so relative imports in
|
importlib.import_module(__package__)
|
||||||
# handler modules (e.g., from .queue_worker import register_task_handler)
|
|
||||||
# continue to work.
|
|
||||||
bases_to_try = []
|
|
||||||
if pkg_name:
|
|
||||||
bases_to_try.append(pkg_name)
|
|
||||||
|
|
||||||
# If executed as a script (no package), make 'app' importable so that handler modules
|
|
||||||
# can be imported using absolute imports. Handler modules must use absolute imports
|
|
||||||
# (e.g., 'from backend.app.workers.queue_worker import register_task_handler').
|
|
||||||
if not pkg_name:
|
|
||||||
backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # .../backend
|
|
||||||
if backend_dir not in sys.path:
|
|
||||||
sys.path.insert(0, backend_dir)
|
|
||||||
try:
|
|
||||||
importlib.import_module("app.workers")
|
|
||||||
sys.modules.setdefault("app.workers.queue_worker", sys.modules[__name__])
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug("Failed to prepare package context for handlers: %s", e)
|
|
||||||
# Ensure we try importing under 'app.workers'
|
|
||||||
if "app.workers" not in bases_to_try:
|
|
||||||
bases_to_try.append("app.workers")
|
|
||||||
|
|
||||||
# Also try common alternative base when the whole project is a package
|
|
||||||
for base in ("backend.app.workers", "app.workers"):
|
|
||||||
if base not in bases_to_try:
|
|
||||||
bases_to_try.append(base)
|
|
||||||
|
|
||||||
for base in bases_to_try:
|
|
||||||
try:
|
|
||||||
importlib.import_module(base)
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
for mod in handler_modules:
|
for mod in handler_modules:
|
||||||
full_name = f"{base}.{mod}"
|
full_name = f"{__package__}.{mod}"
|
||||||
try:
|
try:
|
||||||
importlib.import_module(full_name)
|
importlib.import_module(full_name)
|
||||||
imported += 1
|
imported += 1
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Failed to import handler module %s: %s", full_name, e)
|
logger.debug("Failed to import handler module %s: %s", full_name, e)
|
||||||
if imported:
|
|
||||||
break
|
|
||||||
|
|
||||||
if not imported:
|
if not imported:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
@@ -126,8 +91,6 @@ def _auto_import_handlers() -> None:
|
|||||||
else:
|
else:
|
||||||
logger.info("Imported %d handler module(s): %s", imported, ", ".join(handler_modules))
|
logger.info("Imported %d handler module(s): %s", imported, ", ".join(handler_modules))
|
||||||
|
|
||||||
# Trigger auto-discovery of handlers at import time
|
|
||||||
_auto_import_handlers()
|
|
||||||
|
|
||||||
async def _consume(loop: asyncio.AbstractEventLoop) -> None:
|
async def _consume(loop: asyncio.AbstractEventLoop) -> None:
|
||||||
import aio_pika
|
import aio_pika
|
||||||
@@ -171,6 +134,7 @@ async def _consume(loop: asyncio.AbstractEventLoop) -> None:
|
|||||||
await connection.close()
|
await connection.close()
|
||||||
logger.info("Worker stopped")
|
logger.info("Worker stopped")
|
||||||
|
|
||||||
|
|
||||||
async def main() -> None:
|
async def main() -> None:
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
|
|
||||||
@@ -194,7 +158,9 @@ async def main() -> None:
|
|||||||
with suppress(asyncio.CancelledError):
|
with suppress(asyncio.CancelledError):
|
||||||
await consumer
|
await consumer
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
_auto_import_handlers()
|
||||||
try:
|
try:
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
|||||||
@@ -17,8 +17,8 @@ spec:
|
|||||||
- image: lukastrkan/cc-app-demo@sha256:d320eefb9dee05dc0f0ec5a2ca90daae7ca8c2af0088dc6b88eee076486c0f3b
|
- image: lukastrkan/cc-app-demo@sha256:d320eefb9dee05dc0f0ec5a2ca90daae7ca8c2af0088dc6b88eee076486c0f3b
|
||||||
name: app-demo-worker
|
name: app-demo-worker
|
||||||
command:
|
command:
|
||||||
- python3
|
- python3 -m
|
||||||
- app/workers/queue_worker.py
|
- app.workers.queue_worker
|
||||||
env:
|
env:
|
||||||
- name: RABBITMQ_USERNAME
|
- name: RABBITMQ_USERNAME
|
||||||
value: demo-app
|
value: demo-app
|
||||||
|
|||||||
Reference in New Issue
Block a user