feat(infrastructure): use celery worker

This commit is contained in:
2025-10-02 14:32:37 +02:00
parent 49efd88f29
commit 9a436d3c70
2 changed files with 105 additions and 0 deletions

61
backend/app/celery_app.py Normal file
View File

@@ -0,0 +1,61 @@
import os
from celery import Celery
# Reuse existing RabbitMQ configuration env vars with vhost and optional TLS support
# If RABBITMQ_URL is provided, it takes precedence. Otherwise compose from parts.
if os.getenv("RABBITMQ_URL"):
RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore
else:
from urllib.parse import quote
username = os.getenv("RABBITMQ_USERNAME", "user")
password = os.getenv("RABBITMQ_PASSWORD", "bitnami123")
host = os.getenv("RABBITMQ_HOST", "localhost")
port = os.getenv("RABBITMQ_PORT", "5672")
vhost = os.getenv("RABBITMQ_VHOST", "/")
use_ssl = os.getenv("RABBITMQ_USE_SSL", "0").lower() in {"1", "true", "yes"}
scheme = "amqps" if use_ssl else "amqp"
# Kombu uses '//' to denote the default '/' vhost. For custom vhosts, URL-encode them.
if vhost in ("/", ""):
vhost_path = "/" # will become '//' after concatenation below
else:
vhost_path = f"/{quote(vhost, safe='')}"
# Ensure we end up with e.g. amqp://user:pass@host:5672// (for '/')
RABBITMQ_URL = f"{scheme}://{username}:{password}@{host}:{port}{vhost_path}"
if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"):
RABBITMQ_URL += "/"
# Default queue name to keep parity with the previous worker
DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue")
# Use RPC backend by default to avoid coupling to Redis
CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://")
celery_app = Celery(
"app",
broker=RABBITMQ_URL,
backend=CELERY_BACKEND,
include=[
"app.workers.celery_tasks",
],
)
# Ensure this Celery app becomes the default for producers (e.g., FastAPI process)
# so that @shared_task.delay(...) uses the same broker/credentials as the worker.
celery_app.set_default()
# Basic, safe defaults single prefetch helps fairness similarly to the old worker
celery_app.conf.update(
task_default_queue=DEFAULT_QUEUE,
task_acks_late=True,
worker_prefetch_multiplier=int(os.getenv("CELERY_PREFETCH", "1")),
task_serializer="json",
result_serializer="json",
accept_content=["json"],
broker_heartbeat=0, # let kombu handle heartbeats robustly in some envs
)
# Expose a shortcut for Celery CLI discovery: `celery -A app.celery_app worker ...`
__all__ = ["celery_app"]

View File

@@ -0,0 +1,44 @@
import logging
from typing import Any, Dict
from celery import shared_task
logger = logging.getLogger("celery_tasks")
if not logger.handlers:
_h = logging.StreamHandler()
logger.addHandler(_h)
logger.setLevel(logging.INFO)
@shared_task(name="workers.send_email")
def send_email(payload: Dict[str, Any]) -> None:
"""Celery task to send an email.
Expected payload schema:
{
"type": "email",
"to": "recipient@example.com",
"subject": "Subject text",
"body": "Email body text"
}
This mirrors the previous queue worker contract to minimize changes upstream.
"""
to = payload.get("to")
subject = payload.get("subject")
body = payload.get("body")
if not (to and subject and body):
logger.error("Email task missing fields. Payload: %s", payload)
return
# Placeholder for real email sending logic
# Implement SMTP provider call here
logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body))
@shared_task(name="workers.send_email_fields")
def send_email_fields(to: str, subject: str, body: str) -> None:
"""Alternate signature for convenience when calling from app code.
Routes to the same implementation as send_email.
"""
send_email({"type": "email", "to": to, "subject": subject, "body": body})