diff --git a/backend/app/celery_app.py b/backend/app/celery_app.py new file mode 100644 index 0000000..6655068 --- /dev/null +++ b/backend/app/celery_app.py @@ -0,0 +1,61 @@ +import os +from celery import Celery + +# Reuse existing RabbitMQ configuration env vars with vhost and optional TLS support +# If RABBITMQ_URL is provided, it takes precedence. Otherwise compose from parts. +if os.getenv("RABBITMQ_URL"): + RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore +else: + from urllib.parse import quote + + username = os.getenv("RABBITMQ_USERNAME", "user") + password = os.getenv("RABBITMQ_PASSWORD", "bitnami123") + host = os.getenv("RABBITMQ_HOST", "localhost") + port = os.getenv("RABBITMQ_PORT", "5672") + vhost = os.getenv("RABBITMQ_VHOST", "/") + use_ssl = os.getenv("RABBITMQ_USE_SSL", "0").lower() in {"1", "true", "yes"} + scheme = "amqps" if use_ssl else "amqp" + + # Kombu uses '//' to denote the default '/' vhost. For custom vhosts, URL-encode them. + if vhost in ("/", ""): + vhost_path = "/" # will become '//' after concatenation below + else: + vhost_path = f"/{quote(vhost, safe='')}" + + # Ensure we end up with e.g. amqp://user:pass@host:5672// (for '/') + RABBITMQ_URL = f"{scheme}://{username}:{password}@{host}:{port}{vhost_path}" + if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"): + RABBITMQ_URL += "/" + +# Default queue name to keep parity with the previous worker +DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue") + +# Use RPC backend by default to avoid coupling to Redis +CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://") + +celery_app = Celery( + "app", + broker=RABBITMQ_URL, + backend=CELERY_BACKEND, + include=[ + "app.workers.celery_tasks", + ], +) + +# Ensure this Celery app becomes the default for producers (e.g., FastAPI process) +# so that @shared_task.delay(...) uses the same broker/credentials as the worker. +celery_app.set_default() + +# Basic, safe defaults – single prefetch helps fairness similarly to the old worker +celery_app.conf.update( + task_default_queue=DEFAULT_QUEUE, + task_acks_late=True, + worker_prefetch_multiplier=int(os.getenv("CELERY_PREFETCH", "1")), + task_serializer="json", + result_serializer="json", + accept_content=["json"], + broker_heartbeat=0, # let kombu handle heartbeats robustly in some envs +) + +# Expose a shortcut for Celery CLI discovery: `celery -A app.celery_app worker ...` +__all__ = ["celery_app"] diff --git a/backend/app/workers/celery_tasks.py b/backend/app/workers/celery_tasks.py new file mode 100644 index 0000000..92b16b2 --- /dev/null +++ b/backend/app/workers/celery_tasks.py @@ -0,0 +1,44 @@ +import logging +from typing import Any, Dict + +from celery import shared_task + +logger = logging.getLogger("celery_tasks") +if not logger.handlers: + _h = logging.StreamHandler() + logger.addHandler(_h) +logger.setLevel(logging.INFO) + + +@shared_task(name="workers.send_email") +def send_email(payload: Dict[str, Any]) -> None: + """Celery task to send an email. + + Expected payload schema: + { + "type": "email", + "to": "recipient@example.com", + "subject": "Subject text", + "body": "Email body text" + } + + This mirrors the previous queue worker contract to minimize changes upstream. + """ + to = payload.get("to") + subject = payload.get("subject") + body = payload.get("body") + if not (to and subject and body): + logger.error("Email task missing fields. Payload: %s", payload) + return + + # Placeholder for real email sending logic + # Implement SMTP provider call here + logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body)) + + +@shared_task(name="workers.send_email_fields") +def send_email_fields(to: str, subject: str, body: str) -> None: + """Alternate signature for convenience when calling from app code. + Routes to the same implementation as send_email. + """ + send_email({"type": "email", "to": to, "subject": subject, "body": body})