From 145565b542795092793605a5891ee3c524be6a93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Trkan?= Date: Thu, 2 Oct 2025 14:59:44 +0200 Subject: [PATCH] feat(infrastructure): use celery worker --- backend/app/celery_app.py | 15 ++------------- backend/app/core/queue.py | 26 ++------------------------ backend/app/workers/celery_tasks.py | 29 ++--------------------------- 3 files changed, 6 insertions(+), 64 deletions(-) diff --git a/backend/app/celery_app.py b/backend/app/celery_app.py index 6655068..574964d 100644 --- a/backend/app/celery_app.py +++ b/backend/app/celery_app.py @@ -1,8 +1,6 @@ import os from celery import Celery -# Reuse existing RabbitMQ configuration env vars with vhost and optional TLS support -# If RABBITMQ_URL is provided, it takes precedence. Otherwise compose from parts. if os.getenv("RABBITMQ_URL"): RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore else: @@ -27,26 +25,19 @@ else: if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"): RABBITMQ_URL += "/" -# Default queue name to keep parity with the previous worker DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue") -# Use RPC backend by default to avoid coupling to Redis CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://") celery_app = Celery( "app", broker=RABBITMQ_URL, - backend=CELERY_BACKEND, - include=[ - "app.workers.celery_tasks", - ], + # backend=CELERY_BACKEND, ) +celery_app.autodiscover_tasks(["app.workers"], related_name="celery_tasks") # discover app.workers.celery_tasks -# Ensure this Celery app becomes the default for producers (e.g., FastAPI process) -# so that @shared_task.delay(...) uses the same broker/credentials as the worker. celery_app.set_default() -# Basic, safe defaults – single prefetch helps fairness similarly to the old worker celery_app.conf.update( task_default_queue=DEFAULT_QUEUE, task_acks_late=True, @@ -54,8 +45,6 @@ celery_app.conf.update( task_serializer="json", result_serializer="json", accept_content=["json"], - broker_heartbeat=0, # let kombu handle heartbeats robustly in some envs ) -# Expose a shortcut for Celery CLI discovery: `celery -A app.celery_app worker ...` __all__ = ["celery_app"] diff --git a/backend/app/core/queue.py b/backend/app/core/queue.py index f899a1a..1fd564e 100644 --- a/backend/app/core/queue.py +++ b/backend/app/core/queue.py @@ -1,28 +1,6 @@ -# Route email jobs via Celery instead of raw RabbitMQ publishing -# Ensure Celery app is initialized so producers use correct broker credentials. -# Import has side effects (sets Celery default app), safe to keep at module level. import app.celery_app # noqa: F401 - -# Use a lazy proxy so _send_email_task is never None, even if Celery modules -# are not yet importable at import time (e.g., different working dir during startup). -class _LazySendTask: - def delay(self, to: str, subject: str, body: str): # type: ignore[no-untyped-def] - # Late import on first use - from app.workers.celery_tasks import send_email_fields as _send # type: ignore - return _send.delay(to, subject, body) - - -try: - # Try eager import for normal operation - from app.workers.celery_tasks import send_email_fields as _send_email_task # type: ignore -except ImportError: # pragma: no cover - fallback lazy import proxy - _send_email_task = _LazySendTask() # type: ignore +from app.workers.celery_tasks import send_email def enqueue_email(to: str, subject: str, body: str) -> None: - """Enqueue an email job using Celery. - - Keeps the same public API as before. - """ - _send_email_task.delay(to, subject, body) - + send_email.delay(to, subject, body) diff --git a/backend/app/workers/celery_tasks.py b/backend/app/workers/celery_tasks.py index 92b16b2..861004c 100644 --- a/backend/app/workers/celery_tasks.py +++ b/backend/app/workers/celery_tasks.py @@ -1,5 +1,4 @@ import logging -from typing import Any, Dict from celery import shared_task @@ -11,34 +10,10 @@ logger.setLevel(logging.INFO) @shared_task(name="workers.send_email") -def send_email(payload: Dict[str, Any]) -> None: - """Celery task to send an email. - - Expected payload schema: - { - "type": "email", - "to": "recipient@example.com", - "subject": "Subject text", - "body": "Email body text" - } - - This mirrors the previous queue worker contract to minimize changes upstream. - """ - to = payload.get("to") - subject = payload.get("subject") - body = payload.get("body") +def send_email(to: str, subject: str, body: str) -> None: if not (to and subject and body): - logger.error("Email task missing fields. Payload: %s", payload) + logger.error("Email task missing fields. to=%r subject=%r body_len=%r", to, subject, len(body) if body else 0) return # Placeholder for real email sending logic - # Implement SMTP provider call here logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body)) - - -@shared_task(name="workers.send_email_fields") -def send_email_fields(to: str, subject: str, body: str) -> None: - """Alternate signature for convenience when calling from app code. - Routes to the same implementation as send_email. - """ - send_email({"type": "email", "to": to, "subject": subject, "body": body})