Compare commits

...

10 Commits

8 changed files with 96 additions and 92 deletions

50
backend/app/celery_app.py Normal file
View File

@@ -0,0 +1,50 @@
import os
from celery import Celery
if os.getenv("RABBITMQ_URL"):
RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore
else:
from urllib.parse import quote
username = os.getenv("RABBITMQ_USERNAME", "user")
password = os.getenv("RABBITMQ_PASSWORD", "bitnami123")
host = os.getenv("RABBITMQ_HOST", "localhost")
port = os.getenv("RABBITMQ_PORT", "5672")
vhost = os.getenv("RABBITMQ_VHOST", "/")
use_ssl = os.getenv("RABBITMQ_USE_SSL", "0").lower() in {"1", "true", "yes"}
scheme = "amqps" if use_ssl else "amqp"
# Kombu uses '//' to denote the default '/' vhost. For custom vhosts, URL-encode them.
if vhost in ("/", ""):
vhost_path = "/" # will become '//' after concatenation below
else:
vhost_path = f"/{quote(vhost, safe='')}"
# Ensure we end up with e.g. amqp://user:pass@host:5672// (for '/')
RABBITMQ_URL = f"{scheme}://{username}:{password}@{host}:{port}{vhost_path}"
if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"):
RABBITMQ_URL += "/"
DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue")
CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://")
celery_app = Celery(
"app",
broker=RABBITMQ_URL,
# backend=CELERY_BACKEND,
)
celery_app.autodiscover_tasks(["app.workers"], related_name="celery_tasks") # discover app.workers.celery_tasks
celery_app.set_default()
celery_app.conf.update(
task_default_queue=DEFAULT_QUEUE,
task_acks_late=True,
worker_prefetch_multiplier=int(os.getenv("CELERY_PREFETCH", "1")),
task_serializer="json",
result_serializer="json",
accept_content=["json"],
)
__all__ = ["celery_app"]

View File

@@ -1,35 +1,6 @@
import json
import os
from typing import Any, Dict
import asyncio
import app.celery_app # noqa: F401
from app.workers.celery_tasks import send_email
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
f"{os.getenv('RABBITMQ_PORT', '5672')}"
)
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
async def _publish_async(message: Dict[str, Any]) -> None:
import aio_pika
connection = await aio_pika.connect_robust(RABBITMQ_URL)
try:
channel = await connection.channel()
await channel.declare_queue(QUEUE_NAME, durable=True)
body = json.dumps(message).encode("utf-8")
await channel.default_exchange.publish(
aio_pika.Message(body=body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
routing_key=QUEUE_NAME,
)
finally:
await connection.close()
def enqueue_email(to: str, subject: str, body: str) -> None:
message = {"type": "email", "to": to, "subject": subject, "body": body}
try:
loop = asyncio.get_running_loop()
loop.create_task(_publish_async(message))
except RuntimeError:
asyncio.run(_publish_async(message))
send_email.delay(to, subject, body)

View File

@@ -7,8 +7,8 @@ from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin
from fastapi_users.authentication import (
AuthenticationBackend,
BearerTransport,
JWTStrategy,
)
from fastapi_users.authentication.strategy.jwt import JWTStrategy
from fastapi_users.db import SQLAlchemyUserDatabase
from app.models.user import User
@@ -47,7 +47,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
)
try:
enqueue_email(to=user.email, subject=subject, body=body)
except Exception:
except Exception as e:
print("[Email Fallback] To:", user.email)
print("[Email Fallback] Subject:", subject)
print("[Email Fallback] Body:\n", body)

View File

@@ -0,0 +1,19 @@
import logging
from celery import shared_task
logger = logging.getLogger("celery_tasks")
if not logger.handlers:
_h = logging.StreamHandler()
logger.addHandler(_h)
logger.setLevel(logging.INFO)
@shared_task(name="workers.send_email")
def send_email(to: str, subject: str, body: str) -> None:
if not (to and subject and body):
logger.error("Email task missing fields. to=%r subject=%r body_len=%r", to, subject, len(body) if body else 0)
return
# Placeholder for real email sending logic
logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body))

View File

@@ -2,14 +2,20 @@ aio-pika==9.5.6
aiormq==6.8.1
aiosqlite==0.21.0
alembic==1.16.5
amqp==5.3.1
annotated-types==0.7.0
anyio==4.11.0
argon2-cffi==23.1.0
argon2-cffi-bindings==25.1.0
asyncmy==0.2.9
bcrypt==4.3.0
billiard==4.2.2
celery==5.5.3
cffi==2.0.0
click==8.1.8
click-didyoumean==0.3.1
click-plugins==1.1.1.2
click-repl==0.3.0
cryptography==46.0.1
dnspython==2.7.0
email_validator==2.2.0
@@ -21,11 +27,14 @@ greenlet==3.2.4
h11==0.16.0
httptools==0.6.4
idna==3.10
kombu==5.5.4
makefun==1.16.0
Mako==1.3.10
MarkupSafe==3.0.2
multidict==6.6.4
packaging==25.0
pamqp==3.3.0
prompt_toolkit==3.0.52
propcache==0.3.2
pwdlib==0.2.1
pycparser==2.23
@@ -33,17 +42,22 @@ pydantic==2.11.9
pydantic_core==2.33.2
PyJWT==2.10.1
PyMySQL==1.1.2
python-dateutil==2.9.0.post0
python-dotenv==1.1.1
python-multipart==0.0.20
PyYAML==6.0.2
six==1.17.0
sniffio==1.3.1
SQLAlchemy==2.0.43
starlette==0.48.0
tomli==2.2.1
typing-inspection==0.4.1
typing_extensions==4.15.0
tzdata==2025.2
uvicorn==0.37.0
uvloop==0.21.0
vine==5.1.0
watchfiles==1.1.0
wcwidth==0.2.14
websockets==15.0.1
yarl==1.20.1

View File

@@ -1,56 +0,0 @@
import asyncio
import json
import os
from typing import Any, Dict
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
f"{os.getenv('RABBITMQ_PORT', '5672')}"
)
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
async def handle_message(message_body: bytes) -> None:
try:
data: Dict[str, Any] = json.loads(message_body.decode("utf-8"))
except Exception as e:
print(f"[email_worker] Failed to decode message: {e}")
return
if data.get("type") != "email":
print(f"[email_worker] Unknown message type: {data}")
return
to = data.get("to")
subject = data.get("subject")
body = data.get("body")
if not (to and subject and body):
print(f"[email_worker] Incomplete email message: {data}")
return
try:
await send_email(to=to, subject=subject, body=body)
print(f"[email_worker] Sent email to {to}")
except Exception as e:
print(f"[email_worker] Error sending email to {to}: {e}")
async def main() -> None:
import aio_pika
print(f"[email_worker] Connecting to RabbitMQ at {RABBITMQ_URL}")
connection = await aio_pika.connect_robust(RABBITMQ_URL)
channel = await connection.channel()
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
print(f"[email_worker] Waiting for messages in queue '{QUEUE_NAME}' ...")
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process(requeue=False):
await handle_message(message.body)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -17,8 +17,14 @@ spec:
- image: lukastrkan/cc-app-demo@sha256:d320eefb9dee05dc0f0ec5a2ca90daae7ca8c2af0088dc6b88eee076486c0f3b
name: app-demo-worker
command:
- python3
- worker/email_worker.py
- celery
- -A
- app.celery_app
- worker
- -Q
- $(MAIL_QUEUE)
- --loglevel
- INFO
env:
- name: RABBITMQ_USERNAME
value: demo-app