mirror of
https://github.com/dat515-2025/Group-8.git
synced 2026-03-22 15:12:08 +01:00
feat(infrastructure): use celery worker
This commit is contained in:
@@ -1,35 +1,28 @@
|
|||||||
import json
|
# Route email jobs via Celery instead of raw RabbitMQ publishing
|
||||||
import os
|
# Ensure Celery app is initialized so producers use correct broker credentials.
|
||||||
from typing import Any, Dict
|
# Import has side effects (sets Celery default app), safe to keep at module level.
|
||||||
import asyncio
|
import app.celery_app # noqa: F401
|
||||||
|
|
||||||
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
|
# Use a lazy proxy so _send_email_task is never None, even if Celery modules
|
||||||
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
|
# are not yet importable at import time (e.g., different working dir during startup).
|
||||||
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
|
class _LazySendTask:
|
||||||
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
|
def delay(self, to: str, subject: str, body: str): # type: ignore[no-untyped-def]
|
||||||
f"{os.getenv('RABBITMQ_PORT', '5672')}"
|
# Late import on first use
|
||||||
)
|
from app.workers.celery_tasks import send_email_fields as _send # type: ignore
|
||||||
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
|
return _send.delay(to, subject, body)
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Try eager import for normal operation
|
||||||
|
from app.workers.celery_tasks import send_email_fields as _send_email_task # type: ignore
|
||||||
|
except ImportError: # pragma: no cover - fallback lazy import proxy
|
||||||
|
_send_email_task = _LazySendTask() # type: ignore
|
||||||
|
|
||||||
async def _publish_async(message: Dict[str, Any]) -> None:
|
|
||||||
import aio_pika
|
|
||||||
connection = await aio_pika.connect_robust(RABBITMQ_URL)
|
|
||||||
try:
|
|
||||||
channel = await connection.channel()
|
|
||||||
await channel.declare_queue(QUEUE_NAME, durable=True)
|
|
||||||
body = json.dumps(message).encode("utf-8")
|
|
||||||
await channel.default_exchange.publish(
|
|
||||||
aio_pika.Message(body=body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
|
|
||||||
routing_key=QUEUE_NAME,
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
await connection.close()
|
|
||||||
|
|
||||||
def enqueue_email(to: str, subject: str, body: str) -> None:
|
def enqueue_email(to: str, subject: str, body: str) -> None:
|
||||||
message = {"type": "email", "to": to, "subject": subject, "body": body}
|
"""Enqueue an email job using Celery.
|
||||||
try:
|
|
||||||
loop = asyncio.get_running_loop()
|
Keeps the same public API as before.
|
||||||
loop.create_task(_publish_async(message))
|
"""
|
||||||
except RuntimeError:
|
_send_email_task.delay(to, subject, body)
|
||||||
asyncio.run(_publish_async(message))
|
|
||||||
|
|
||||||
|
|||||||
@@ -7,8 +7,8 @@ from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin
|
|||||||
from fastapi_users.authentication import (
|
from fastapi_users.authentication import (
|
||||||
AuthenticationBackend,
|
AuthenticationBackend,
|
||||||
BearerTransport,
|
BearerTransport,
|
||||||
JWTStrategy,
|
|
||||||
)
|
)
|
||||||
|
from fastapi_users.authentication.strategy.jwt import JWTStrategy
|
||||||
from fastapi_users.db import SQLAlchemyUserDatabase
|
from fastapi_users.db import SQLAlchemyUserDatabase
|
||||||
|
|
||||||
from app.models.user import User
|
from app.models.user import User
|
||||||
@@ -47,7 +47,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
|
|||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
enqueue_email(to=user.email, subject=subject, body=body)
|
enqueue_email(to=user.email, subject=subject, body=body)
|
||||||
except Exception:
|
except Exception as e:
|
||||||
print("[Email Fallback] To:", user.email)
|
print("[Email Fallback] To:", user.email)
|
||||||
print("[Email Fallback] Subject:", subject)
|
print("[Email Fallback] Subject:", subject)
|
||||||
print("[Email Fallback] Body:\n", body)
|
print("[Email Fallback] Body:\n", body)
|
||||||
|
|||||||
@@ -1,29 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
from typing import Any, Dict
|
|
||||||
|
|
||||||
# Import decorator and logger from the worker so handlers can register themselves
|
|
||||||
from .queue_worker import register_task_handler, logger
|
|
||||||
|
|
||||||
|
|
||||||
@register_task_handler("email")
|
|
||||||
async def handle_email_task(payload: Dict[str, Any]) -> None:
|
|
||||||
"""Handle 'email' tasks dispatched by the queue worker.
|
|
||||||
|
|
||||||
Expected payload schema:
|
|
||||||
{
|
|
||||||
"type": "email",
|
|
||||||
"to": "recipient@example.com",
|
|
||||||
"subject": "Subject text",
|
|
||||||
"body": "Email body text"
|
|
||||||
}
|
|
||||||
"""
|
|
||||||
to = payload.get("to")
|
|
||||||
subject = payload.get("subject")
|
|
||||||
body = payload.get("body")
|
|
||||||
if not (to and subject and body):
|
|
||||||
logger.error("Email task missing fields. Payload: %s", payload)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Placeholder for real email sending logic
|
|
||||||
await asyncio.sleep(0) # yield control, simulate async work
|
|
||||||
logger.info("Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body))
|
|
||||||
@@ -1,167 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import signal
|
|
||||||
import socket
|
|
||||||
from contextlib import suppress
|
|
||||||
from typing import Any, Awaitable, Callable, Dict
|
|
||||||
|
|
||||||
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
|
|
||||||
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
|
|
||||||
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
|
|
||||||
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
|
|
||||||
f"{os.getenv('RABBITMQ_PORT', '5672')}"
|
|
||||||
)
|
|
||||||
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
|
|
||||||
|
|
||||||
logger = logging.getLogger("queue_worker")
|
|
||||||
logger.propagate = False # prevent double logging
|
|
||||||
if not logger.handlers: # do not duplicate handlers
|
|
||||||
_handler = logging.StreamHandler()
|
|
||||||
_formatter = logging.Formatter(
|
|
||||||
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
|
|
||||||
datefmt="%Y-%m-%d %H:%M:%S",
|
|
||||||
)
|
|
||||||
_handler.setFormatter(_formatter)
|
|
||||||
logger.addHandler(_handler)
|
|
||||||
logger.setLevel(os.getenv("WORKER_LOG_LEVEL", "INFO").upper())
|
|
||||||
|
|
||||||
TaskHandler = Callable[[Dict[str, Any]], Awaitable[None]]
|
|
||||||
_TASK_HANDLERS: Dict[str, TaskHandler] = {}
|
|
||||||
|
|
||||||
|
|
||||||
def register_task_handler(task_type: str) -> Callable[[TaskHandler], TaskHandler]:
|
|
||||||
def decorator(func: TaskHandler) -> TaskHandler:
|
|
||||||
_TASK_HANDLERS[task_type] = func
|
|
||||||
logger.debug("Registered task handler for type='%s' -> %s", task_type, func.__name__)
|
|
||||||
return func
|
|
||||||
|
|
||||||
return decorator
|
|
||||||
|
|
||||||
|
|
||||||
async def _dispatch(message_dict: Dict[str, Any]) -> None:
|
|
||||||
task_type = message_dict.get("type")
|
|
||||||
if not task_type:
|
|
||||||
logger.warning("Skipping message without 'type': %s", message_dict)
|
|
||||||
return
|
|
||||||
handler = _TASK_HANDLERS.get(task_type)
|
|
||||||
if not handler:
|
|
||||||
logger.warning("No handler for type='%s'. Message: %s", task_type, message_dict)
|
|
||||||
return
|
|
||||||
await handler(message_dict)
|
|
||||||
|
|
||||||
|
|
||||||
def _auto_import_handlers() -> None:
|
|
||||||
import importlib
|
|
||||||
import os
|
|
||||||
import pkgutil
|
|
||||||
|
|
||||||
pkg_name = __package__ # e.g., 'backend.app.workers' when run as module
|
|
||||||
dir_path = os.path.dirname(__file__)
|
|
||||||
|
|
||||||
print(f"Package: {pkg_name}, Dir: {dir_path}")
|
|
||||||
|
|
||||||
# Discover handler module base names in the filesystem
|
|
||||||
handler_modules = []
|
|
||||||
for finder, name, ispkg in pkgutil.iter_modules([dir_path]):
|
|
||||||
if not ispkg and name.endswith("_handler"):
|
|
||||||
handler_modules.append(name)
|
|
||||||
|
|
||||||
if not handler_modules:
|
|
||||||
logger.info("No handler modules discovered in %s", dir_path)
|
|
||||||
return
|
|
||||||
|
|
||||||
imported = 0
|
|
||||||
|
|
||||||
importlib.import_module(__package__)
|
|
||||||
for mod in handler_modules:
|
|
||||||
full_name = f"{__package__}.{mod}"
|
|
||||||
try:
|
|
||||||
importlib.import_module(full_name)
|
|
||||||
imported += 1
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug("Failed to import handler module %s: %s", full_name, e)
|
|
||||||
|
|
||||||
if not imported:
|
|
||||||
logger.warning(
|
|
||||||
"Failed to import any handler modules. Ensure the worker is executed "
|
|
||||||
"as a module (e.g., 'python -m backend.app.workers.queue_worker') or as a script (e.g., 'python3 app/workers/queue_worker.py')."
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.info("Imported %d handler module(s): %s", imported, ", ".join(handler_modules))
|
|
||||||
|
|
||||||
|
|
||||||
async def _consume(loop: asyncio.AbstractEventLoop) -> None:
|
|
||||||
import aio_pika
|
|
||||||
|
|
||||||
# Mask password in URL for logging
|
|
||||||
safe_url = RABBITMQ_URL
|
|
||||||
with suppress(Exception):
|
|
||||||
if "@" in RABBITMQ_URL and "://" in RABBITMQ_URL:
|
|
||||||
scheme, rest = RABBITMQ_URL.split("://", 1)
|
|
||||||
creds_host = rest.split("@")
|
|
||||||
if len(creds_host) == 2:
|
|
||||||
user_pass, host = creds_host
|
|
||||||
user = user_pass.split(":")[0]
|
|
||||||
safe_url = f"{scheme}://{user}:***@{host}"
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"Starting worker | pid=%s | host=%s | queue=%s | url=%s",
|
|
||||||
os.getpid(), socket.gethostname(), QUEUE_NAME, safe_url,
|
|
||||||
)
|
|
||||||
|
|
||||||
connection = await aio_pika.connect_robust(RABBITMQ_URL, loop=loop)
|
|
||||||
try:
|
|
||||||
channel = await connection.channel()
|
|
||||||
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
|
|
||||||
logger.info("Waiting for messages on queue='%s' …", QUEUE_NAME)
|
|
||||||
|
|
||||||
async with queue.iterator() as queue_iter:
|
|
||||||
async for message in queue_iter:
|
|
||||||
async with message.process(requeue=False):
|
|
||||||
try:
|
|
||||||
payload = json.loads(message.body.decode("utf-8"))
|
|
||||||
except Exception as e:
|
|
||||||
logger.exception("Failed to decode message: %s", e)
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
await _dispatch(payload)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Unhandled error while processing message: %s", payload)
|
|
||||||
# message is not requeued due to process(requeue=False)
|
|
||||||
finally:
|
|
||||||
await connection.close()
|
|
||||||
logger.info("Worker stopped")
|
|
||||||
|
|
||||||
|
|
||||||
async def main() -> None:
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
|
|
||||||
stop_event = asyncio.Event()
|
|
||||||
|
|
||||||
def _request_shutdown(signame: str) -> None:
|
|
||||||
logger.info("Received %s, shutting down gracefully…", signame)
|
|
||||||
stop_event.set()
|
|
||||||
|
|
||||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
||||||
with suppress(NotImplementedError):
|
|
||||||
loop.add_signal_handler(sig, _request_shutdown, sig.name)
|
|
||||||
|
|
||||||
consumer = asyncio.create_task(_consume(loop))
|
|
||||||
|
|
||||||
# Wait until a stop signal arrives
|
|
||||||
await stop_event.wait()
|
|
||||||
|
|
||||||
# Give the consumer some time to exit
|
|
||||||
consumer.cancel()
|
|
||||||
with suppress(asyncio.CancelledError):
|
|
||||||
await consumer
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
_auto_import_handlers()
|
|
||||||
try:
|
|
||||||
asyncio.run(main())
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
@@ -2,14 +2,20 @@ aio-pika==9.5.6
|
|||||||
aiormq==6.8.1
|
aiormq==6.8.1
|
||||||
aiosqlite==0.21.0
|
aiosqlite==0.21.0
|
||||||
alembic==1.16.5
|
alembic==1.16.5
|
||||||
|
amqp==5.3.1
|
||||||
annotated-types==0.7.0
|
annotated-types==0.7.0
|
||||||
anyio==4.11.0
|
anyio==4.11.0
|
||||||
argon2-cffi==23.1.0
|
argon2-cffi==23.1.0
|
||||||
argon2-cffi-bindings==25.1.0
|
argon2-cffi-bindings==25.1.0
|
||||||
asyncmy==0.2.9
|
asyncmy==0.2.9
|
||||||
bcrypt==4.3.0
|
bcrypt==4.3.0
|
||||||
|
billiard==4.2.2
|
||||||
|
celery==5.5.3
|
||||||
cffi==2.0.0
|
cffi==2.0.0
|
||||||
click==8.1.8
|
click==8.1.8
|
||||||
|
click-didyoumean==0.3.1
|
||||||
|
click-plugins==1.1.1.2
|
||||||
|
click-repl==0.3.0
|
||||||
cryptography==46.0.1
|
cryptography==46.0.1
|
||||||
dnspython==2.7.0
|
dnspython==2.7.0
|
||||||
email_validator==2.2.0
|
email_validator==2.2.0
|
||||||
@@ -21,11 +27,14 @@ greenlet==3.2.4
|
|||||||
h11==0.16.0
|
h11==0.16.0
|
||||||
httptools==0.6.4
|
httptools==0.6.4
|
||||||
idna==3.10
|
idna==3.10
|
||||||
|
kombu==5.5.4
|
||||||
makefun==1.16.0
|
makefun==1.16.0
|
||||||
Mako==1.3.10
|
Mako==1.3.10
|
||||||
MarkupSafe==3.0.2
|
MarkupSafe==3.0.2
|
||||||
multidict==6.6.4
|
multidict==6.6.4
|
||||||
|
packaging==25.0
|
||||||
pamqp==3.3.0
|
pamqp==3.3.0
|
||||||
|
prompt_toolkit==3.0.52
|
||||||
propcache==0.3.2
|
propcache==0.3.2
|
||||||
pwdlib==0.2.1
|
pwdlib==0.2.1
|
||||||
pycparser==2.23
|
pycparser==2.23
|
||||||
@@ -33,17 +42,22 @@ pydantic==2.11.9
|
|||||||
pydantic_core==2.33.2
|
pydantic_core==2.33.2
|
||||||
PyJWT==2.10.1
|
PyJWT==2.10.1
|
||||||
PyMySQL==1.1.2
|
PyMySQL==1.1.2
|
||||||
|
python-dateutil==2.9.0.post0
|
||||||
python-dotenv==1.1.1
|
python-dotenv==1.1.1
|
||||||
python-multipart==0.0.20
|
python-multipart==0.0.20
|
||||||
PyYAML==6.0.2
|
PyYAML==6.0.2
|
||||||
|
six==1.17.0
|
||||||
sniffio==1.3.1
|
sniffio==1.3.1
|
||||||
SQLAlchemy==2.0.43
|
SQLAlchemy==2.0.43
|
||||||
starlette==0.48.0
|
starlette==0.48.0
|
||||||
tomli==2.2.1
|
tomli==2.2.1
|
||||||
typing-inspection==0.4.1
|
typing-inspection==0.4.1
|
||||||
typing_extensions==4.15.0
|
typing_extensions==4.15.0
|
||||||
|
tzdata==2025.2
|
||||||
uvicorn==0.37.0
|
uvicorn==0.37.0
|
||||||
uvloop==0.21.0
|
uvloop==0.21.0
|
||||||
|
vine==5.1.0
|
||||||
watchfiles==1.1.0
|
watchfiles==1.1.0
|
||||||
|
wcwidth==0.2.14
|
||||||
websockets==15.0.1
|
websockets==15.0.1
|
||||||
yarl==1.20.1
|
yarl==1.20.1
|
||||||
|
|||||||
@@ -17,8 +17,14 @@ spec:
|
|||||||
- image: lukastrkan/cc-app-demo@sha256:d320eefb9dee05dc0f0ec5a2ca90daae7ca8c2af0088dc6b88eee076486c0f3b
|
- image: lukastrkan/cc-app-demo@sha256:d320eefb9dee05dc0f0ec5a2ca90daae7ca8c2af0088dc6b88eee076486c0f3b
|
||||||
name: app-demo-worker
|
name: app-demo-worker
|
||||||
command:
|
command:
|
||||||
- python3 -m
|
- celery
|
||||||
- app.workers.queue_worker
|
- -A
|
||||||
|
- app.celery_app
|
||||||
|
- worker
|
||||||
|
- -Q
|
||||||
|
- $(MAIL_QUEUE)
|
||||||
|
- --loglevel
|
||||||
|
- INFO
|
||||||
env:
|
env:
|
||||||
- name: RABBITMQ_USERNAME
|
- name: RABBITMQ_USERNAME
|
||||||
value: demo-app
|
value: demo-app
|
||||||
|
|||||||
Reference in New Issue
Block a user