diff --git a/backend/app/celery_app.py b/backend/app/celery_app.py new file mode 100644 index 0000000..574964d --- /dev/null +++ b/backend/app/celery_app.py @@ -0,0 +1,50 @@ +import os +from celery import Celery + +if os.getenv("RABBITMQ_URL"): + RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore +else: + from urllib.parse import quote + + username = os.getenv("RABBITMQ_USERNAME", "user") + password = os.getenv("RABBITMQ_PASSWORD", "bitnami123") + host = os.getenv("RABBITMQ_HOST", "localhost") + port = os.getenv("RABBITMQ_PORT", "5672") + vhost = os.getenv("RABBITMQ_VHOST", "/") + use_ssl = os.getenv("RABBITMQ_USE_SSL", "0").lower() in {"1", "true", "yes"} + scheme = "amqps" if use_ssl else "amqp" + + # Kombu uses '//' to denote the default '/' vhost. For custom vhosts, URL-encode them. + if vhost in ("/", ""): + vhost_path = "/" # will become '//' after concatenation below + else: + vhost_path = f"/{quote(vhost, safe='')}" + + # Ensure we end up with e.g. amqp://user:pass@host:5672// (for '/') + RABBITMQ_URL = f"{scheme}://{username}:{password}@{host}:{port}{vhost_path}" + if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"): + RABBITMQ_URL += "/" + +DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue") + +CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://") + +celery_app = Celery( + "app", + broker=RABBITMQ_URL, + # backend=CELERY_BACKEND, +) +celery_app.autodiscover_tasks(["app.workers"], related_name="celery_tasks") # discover app.workers.celery_tasks + +celery_app.set_default() + +celery_app.conf.update( + task_default_queue=DEFAULT_QUEUE, + task_acks_late=True, + worker_prefetch_multiplier=int(os.getenv("CELERY_PREFETCH", "1")), + task_serializer="json", + result_serializer="json", + accept_content=["json"], +) + +__all__ = ["celery_app"] diff --git a/backend/app/core/queue.py b/backend/app/core/queue.py index 2aecf9e..1fd564e 100644 --- a/backend/app/core/queue.py +++ b/backend/app/core/queue.py @@ -1,35 +1,6 @@ -import json -import os -from typing import Any, Dict -import asyncio +import app.celery_app # noqa: F401 +from app.workers.celery_tasks import send_email -RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( - f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" - f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" - f"{os.getenv('RABBITMQ_HOST', 'localhost')}:" - f"{os.getenv('RABBITMQ_PORT', '5672')}" -) -QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") - -async def _publish_async(message: Dict[str, Any]) -> None: - import aio_pika - connection = await aio_pika.connect_robust(RABBITMQ_URL) - try: - channel = await connection.channel() - await channel.declare_queue(QUEUE_NAME, durable=True) - body = json.dumps(message).encode("utf-8") - await channel.default_exchange.publish( - aio_pika.Message(body=body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT), - routing_key=QUEUE_NAME, - ) - finally: - await connection.close() def enqueue_email(to: str, subject: str, body: str) -> None: - message = {"type": "email", "to": to, "subject": subject, "body": body} - try: - loop = asyncio.get_running_loop() - loop.create_task(_publish_async(message)) - except RuntimeError: - asyncio.run(_publish_async(message)) - + send_email.delay(to, subject, body) diff --git a/backend/app/services/user_service.py b/backend/app/services/user_service.py index e92d9d3..1d3857e 100644 --- a/backend/app/services/user_service.py +++ b/backend/app/services/user_service.py @@ -7,8 +7,8 @@ from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin from fastapi_users.authentication import ( AuthenticationBackend, BearerTransport, - JWTStrategy, ) +from fastapi_users.authentication.strategy.jwt import JWTStrategy from fastapi_users.db import SQLAlchemyUserDatabase from app.models.user import User @@ -47,7 +47,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]): ) try: enqueue_email(to=user.email, subject=subject, body=body) - except Exception: + except Exception as e: print("[Email Fallback] To:", user.email) print("[Email Fallback] Subject:", subject) print("[Email Fallback] Body:\n", body) diff --git a/backend/app/workers/celery_tasks.py b/backend/app/workers/celery_tasks.py new file mode 100644 index 0000000..861004c --- /dev/null +++ b/backend/app/workers/celery_tasks.py @@ -0,0 +1,19 @@ +import logging + +from celery import shared_task + +logger = logging.getLogger("celery_tasks") +if not logger.handlers: + _h = logging.StreamHandler() + logger.addHandler(_h) +logger.setLevel(logging.INFO) + + +@shared_task(name="workers.send_email") +def send_email(to: str, subject: str, body: str) -> None: + if not (to and subject and body): + logger.error("Email task missing fields. to=%r subject=%r body_len=%r", to, subject, len(body) if body else 0) + return + + # Placeholder for real email sending logic + logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body)) diff --git a/backend/requirements.txt b/backend/requirements.txt index 045371e..1f8e9bf 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -2,14 +2,20 @@ aio-pika==9.5.6 aiormq==6.8.1 aiosqlite==0.21.0 alembic==1.16.5 +amqp==5.3.1 annotated-types==0.7.0 anyio==4.11.0 argon2-cffi==23.1.0 argon2-cffi-bindings==25.1.0 asyncmy==0.2.9 bcrypt==4.3.0 +billiard==4.2.2 +celery==5.5.3 cffi==2.0.0 click==8.1.8 +click-didyoumean==0.3.1 +click-plugins==1.1.1.2 +click-repl==0.3.0 cryptography==46.0.1 dnspython==2.7.0 email_validator==2.2.0 @@ -21,11 +27,14 @@ greenlet==3.2.4 h11==0.16.0 httptools==0.6.4 idna==3.10 +kombu==5.5.4 makefun==1.16.0 Mako==1.3.10 MarkupSafe==3.0.2 multidict==6.6.4 +packaging==25.0 pamqp==3.3.0 +prompt_toolkit==3.0.52 propcache==0.3.2 pwdlib==0.2.1 pycparser==2.23 @@ -33,17 +42,22 @@ pydantic==2.11.9 pydantic_core==2.33.2 PyJWT==2.10.1 PyMySQL==1.1.2 +python-dateutil==2.9.0.post0 python-dotenv==1.1.1 python-multipart==0.0.20 PyYAML==6.0.2 +six==1.17.0 sniffio==1.3.1 SQLAlchemy==2.0.43 starlette==0.48.0 tomli==2.2.1 typing-inspection==0.4.1 typing_extensions==4.15.0 +tzdata==2025.2 uvicorn==0.37.0 uvloop==0.21.0 +vine==5.1.0 watchfiles==1.1.0 +wcwidth==0.2.14 websockets==15.0.1 yarl==1.20.1 diff --git a/backend/worker/__init__.py b/backend/worker/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/backend/worker/email_worker.py b/backend/worker/email_worker.py deleted file mode 100644 index 5f927b6..0000000 --- a/backend/worker/email_worker.py +++ /dev/null @@ -1,56 +0,0 @@ -import asyncio -import json -import os -from typing import Any, Dict - -RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( - f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" - f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" - f"{os.getenv('RABBITMQ_HOST', 'localhost')}:" - f"{os.getenv('RABBITMQ_PORT', '5672')}" -) -QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") - - -async def handle_message(message_body: bytes) -> None: - try: - data: Dict[str, Any] = json.loads(message_body.decode("utf-8")) - except Exception as e: - print(f"[email_worker] Failed to decode message: {e}") - return - - if data.get("type") != "email": - print(f"[email_worker] Unknown message type: {data}") - return - - to = data.get("to") - subject = data.get("subject") - body = data.get("body") - if not (to and subject and body): - print(f"[email_worker] Incomplete email message: {data}") - return - - try: - await send_email(to=to, subject=subject, body=body) - print(f"[email_worker] Sent email to {to}") - except Exception as e: - print(f"[email_worker] Error sending email to {to}: {e}") - - -async def main() -> None: - import aio_pika - - print(f"[email_worker] Connecting to RabbitMQ at {RABBITMQ_URL}") - connection = await aio_pika.connect_robust(RABBITMQ_URL) - channel = await connection.channel() - queue = await channel.declare_queue(QUEUE_NAME, durable=True) - print(f"[email_worker] Waiting for messages in queue '{QUEUE_NAME}' ...") - - async with queue.iterator() as queue_iter: - async for message in queue_iter: - async with message.process(requeue=False): - await handle_message(message.body) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/deployment/app-demo-worker-deployment.yaml b/deployment/app-demo-worker-deployment.yaml index b6a709f..233d095 100644 --- a/deployment/app-demo-worker-deployment.yaml +++ b/deployment/app-demo-worker-deployment.yaml @@ -17,8 +17,14 @@ spec: - image: lukastrkan/cc-app-demo@sha256:75634b4d97282b6b8424fe17767c81adf44af5f7359c1d25883073b5629b3e05 name: app-demo-worker command: - - python3 - - worker/email_worker.py + - celery + - -A + - app.celery_app + - worker + - -Q + - $(MAIL_QUEUE) + - --loglevel + - INFO env: - name: RABBITMQ_USERNAME value: demo-app