mirror of
https://github.com/dat515-2025/Group-8.git
synced 2026-03-22 06:57:47 +01:00
feat(infrastructure): use celery worker
This commit is contained in:
@@ -1,8 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
from celery import Celery
|
from celery import Celery
|
||||||
|
|
||||||
# Reuse existing RabbitMQ configuration env vars with vhost and optional TLS support
|
|
||||||
# If RABBITMQ_URL is provided, it takes precedence. Otherwise compose from parts.
|
|
||||||
if os.getenv("RABBITMQ_URL"):
|
if os.getenv("RABBITMQ_URL"):
|
||||||
RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore
|
RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore
|
||||||
else:
|
else:
|
||||||
@@ -27,26 +25,19 @@ else:
|
|||||||
if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"):
|
if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"):
|
||||||
RABBITMQ_URL += "/"
|
RABBITMQ_URL += "/"
|
||||||
|
|
||||||
# Default queue name to keep parity with the previous worker
|
|
||||||
DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue")
|
DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue")
|
||||||
|
|
||||||
# Use RPC backend by default to avoid coupling to Redis
|
|
||||||
CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://")
|
CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://")
|
||||||
|
|
||||||
celery_app = Celery(
|
celery_app = Celery(
|
||||||
"app",
|
"app",
|
||||||
broker=RABBITMQ_URL,
|
broker=RABBITMQ_URL,
|
||||||
backend=CELERY_BACKEND,
|
# backend=CELERY_BACKEND,
|
||||||
include=[
|
|
||||||
"app.workers.celery_tasks",
|
|
||||||
],
|
|
||||||
)
|
)
|
||||||
|
celery_app.autodiscover_tasks(["app.workers"], related_name="celery_tasks") # discover app.workers.celery_tasks
|
||||||
|
|
||||||
# Ensure this Celery app becomes the default for producers (e.g., FastAPI process)
|
|
||||||
# so that @shared_task.delay(...) uses the same broker/credentials as the worker.
|
|
||||||
celery_app.set_default()
|
celery_app.set_default()
|
||||||
|
|
||||||
# Basic, safe defaults – single prefetch helps fairness similarly to the old worker
|
|
||||||
celery_app.conf.update(
|
celery_app.conf.update(
|
||||||
task_default_queue=DEFAULT_QUEUE,
|
task_default_queue=DEFAULT_QUEUE,
|
||||||
task_acks_late=True,
|
task_acks_late=True,
|
||||||
@@ -54,8 +45,6 @@ celery_app.conf.update(
|
|||||||
task_serializer="json",
|
task_serializer="json",
|
||||||
result_serializer="json",
|
result_serializer="json",
|
||||||
accept_content=["json"],
|
accept_content=["json"],
|
||||||
broker_heartbeat=0, # let kombu handle heartbeats robustly in some envs
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Expose a shortcut for Celery CLI discovery: `celery -A app.celery_app worker ...`
|
|
||||||
__all__ = ["celery_app"]
|
__all__ = ["celery_app"]
|
||||||
|
|||||||
@@ -1,28 +1,6 @@
|
|||||||
# Route email jobs via Celery instead of raw RabbitMQ publishing
|
|
||||||
# Ensure Celery app is initialized so producers use correct broker credentials.
|
|
||||||
# Import has side effects (sets Celery default app), safe to keep at module level.
|
|
||||||
import app.celery_app # noqa: F401
|
import app.celery_app # noqa: F401
|
||||||
|
from app.workers.celery_tasks import send_email
|
||||||
# Use a lazy proxy so _send_email_task is never None, even if Celery modules
|
|
||||||
# are not yet importable at import time (e.g., different working dir during startup).
|
|
||||||
class _LazySendTask:
|
|
||||||
def delay(self, to: str, subject: str, body: str): # type: ignore[no-untyped-def]
|
|
||||||
# Late import on first use
|
|
||||||
from app.workers.celery_tasks import send_email_fields as _send # type: ignore
|
|
||||||
return _send.delay(to, subject, body)
|
|
||||||
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Try eager import for normal operation
|
|
||||||
from app.workers.celery_tasks import send_email_fields as _send_email_task # type: ignore
|
|
||||||
except ImportError: # pragma: no cover - fallback lazy import proxy
|
|
||||||
_send_email_task = _LazySendTask() # type: ignore
|
|
||||||
|
|
||||||
|
|
||||||
def enqueue_email(to: str, subject: str, body: str) -> None:
|
def enqueue_email(to: str, subject: str, body: str) -> None:
|
||||||
"""Enqueue an email job using Celery.
|
send_email.delay(to, subject, body)
|
||||||
|
|
||||||
Keeps the same public API as before.
|
|
||||||
"""
|
|
||||||
_send_email_task.delay(to, subject, body)
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
import logging
|
import logging
|
||||||
from typing import Any, Dict
|
|
||||||
|
|
||||||
from celery import shared_task
|
from celery import shared_task
|
||||||
|
|
||||||
@@ -11,34 +10,10 @@ logger.setLevel(logging.INFO)
|
|||||||
|
|
||||||
|
|
||||||
@shared_task(name="workers.send_email")
|
@shared_task(name="workers.send_email")
|
||||||
def send_email(payload: Dict[str, Any]) -> None:
|
def send_email(to: str, subject: str, body: str) -> None:
|
||||||
"""Celery task to send an email.
|
|
||||||
|
|
||||||
Expected payload schema:
|
|
||||||
{
|
|
||||||
"type": "email",
|
|
||||||
"to": "recipient@example.com",
|
|
||||||
"subject": "Subject text",
|
|
||||||
"body": "Email body text"
|
|
||||||
}
|
|
||||||
|
|
||||||
This mirrors the previous queue worker contract to minimize changes upstream.
|
|
||||||
"""
|
|
||||||
to = payload.get("to")
|
|
||||||
subject = payload.get("subject")
|
|
||||||
body = payload.get("body")
|
|
||||||
if not (to and subject and body):
|
if not (to and subject and body):
|
||||||
logger.error("Email task missing fields. Payload: %s", payload)
|
logger.error("Email task missing fields. to=%r subject=%r body_len=%r", to, subject, len(body) if body else 0)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Placeholder for real email sending logic
|
# Placeholder for real email sending logic
|
||||||
# Implement SMTP provider call here
|
|
||||||
logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body))
|
logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body))
|
||||||
|
|
||||||
|
|
||||||
@shared_task(name="workers.send_email_fields")
|
|
||||||
def send_email_fields(to: str, subject: str, body: str) -> None:
|
|
||||||
"""Alternate signature for convenience when calling from app code.
|
|
||||||
Routes to the same implementation as send_email.
|
|
||||||
"""
|
|
||||||
send_email({"type": "email", "to": to, "subject": subject, "body": body})
|
|
||||||
|
|||||||
Reference in New Issue
Block a user