mirror of
https://github.com/dat515-2025/Group-8.git
synced 2026-03-22 15:12:08 +01:00
Compare commits
10 Commits
main
...
fdf7cef39d
| Author | SHA1 | Date | |
|---|---|---|---|
| fdf7cef39d | |||
| 145565b542 | |||
| 9a436d3c70 | |||
| 49efd88f29 | |||
| 3e809782a6 | |||
| 7cd96c830d | |||
| 233a331cba | |||
| a0bc94d7ec | |||
| e31ec199c0 | |||
| 6d8b760a7d |
50
backend/app/celery_app.py
Normal file
50
backend/app/celery_app.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import os
|
||||
from celery import Celery
|
||||
|
||||
if os.getenv("RABBITMQ_URL"):
|
||||
RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore
|
||||
else:
|
||||
from urllib.parse import quote
|
||||
|
||||
username = os.getenv("RABBITMQ_USERNAME", "user")
|
||||
password = os.getenv("RABBITMQ_PASSWORD", "bitnami123")
|
||||
host = os.getenv("RABBITMQ_HOST", "localhost")
|
||||
port = os.getenv("RABBITMQ_PORT", "5672")
|
||||
vhost = os.getenv("RABBITMQ_VHOST", "/")
|
||||
use_ssl = os.getenv("RABBITMQ_USE_SSL", "0").lower() in {"1", "true", "yes"}
|
||||
scheme = "amqps" if use_ssl else "amqp"
|
||||
|
||||
# Kombu uses '//' to denote the default '/' vhost. For custom vhosts, URL-encode them.
|
||||
if vhost in ("/", ""):
|
||||
vhost_path = "/" # will become '//' after concatenation below
|
||||
else:
|
||||
vhost_path = f"/{quote(vhost, safe='')}"
|
||||
|
||||
# Ensure we end up with e.g. amqp://user:pass@host:5672// (for '/')
|
||||
RABBITMQ_URL = f"{scheme}://{username}:{password}@{host}:{port}{vhost_path}"
|
||||
if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"):
|
||||
RABBITMQ_URL += "/"
|
||||
|
||||
DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue")
|
||||
|
||||
CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://")
|
||||
|
||||
celery_app = Celery(
|
||||
"app",
|
||||
broker=RABBITMQ_URL,
|
||||
# backend=CELERY_BACKEND,
|
||||
)
|
||||
celery_app.autodiscover_tasks(["app.workers"], related_name="celery_tasks") # discover app.workers.celery_tasks
|
||||
|
||||
celery_app.set_default()
|
||||
|
||||
celery_app.conf.update(
|
||||
task_default_queue=DEFAULT_QUEUE,
|
||||
task_acks_late=True,
|
||||
worker_prefetch_multiplier=int(os.getenv("CELERY_PREFETCH", "1")),
|
||||
task_serializer="json",
|
||||
result_serializer="json",
|
||||
accept_content=["json"],
|
||||
)
|
||||
|
||||
__all__ = ["celery_app"]
|
||||
@@ -1,35 +1,6 @@
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Dict
|
||||
import asyncio
|
||||
import app.celery_app # noqa: F401
|
||||
from app.workers.celery_tasks import send_email
|
||||
|
||||
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
|
||||
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
|
||||
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
|
||||
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
|
||||
f"{os.getenv('RABBITMQ_PORT', '5672')}"
|
||||
)
|
||||
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
|
||||
|
||||
async def _publish_async(message: Dict[str, Any]) -> None:
|
||||
import aio_pika
|
||||
connection = await aio_pika.connect_robust(RABBITMQ_URL)
|
||||
try:
|
||||
channel = await connection.channel()
|
||||
await channel.declare_queue(QUEUE_NAME, durable=True)
|
||||
body = json.dumps(message).encode("utf-8")
|
||||
await channel.default_exchange.publish(
|
||||
aio_pika.Message(body=body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
|
||||
routing_key=QUEUE_NAME,
|
||||
)
|
||||
finally:
|
||||
await connection.close()
|
||||
|
||||
def enqueue_email(to: str, subject: str, body: str) -> None:
|
||||
message = {"type": "email", "to": to, "subject": subject, "body": body}
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
loop.create_task(_publish_async(message))
|
||||
except RuntimeError:
|
||||
asyncio.run(_publish_async(message))
|
||||
|
||||
send_email.delay(to, subject, body)
|
||||
|
||||
@@ -7,8 +7,8 @@ from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin
|
||||
from fastapi_users.authentication import (
|
||||
AuthenticationBackend,
|
||||
BearerTransport,
|
||||
JWTStrategy,
|
||||
)
|
||||
from fastapi_users.authentication.strategy.jwt import JWTStrategy
|
||||
from fastapi_users.db import SQLAlchemyUserDatabase
|
||||
|
||||
from app.models.user import User
|
||||
@@ -47,7 +47,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
|
||||
)
|
||||
try:
|
||||
enqueue_email(to=user.email, subject=subject, body=body)
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
print("[Email Fallback] To:", user.email)
|
||||
print("[Email Fallback] Subject:", subject)
|
||||
print("[Email Fallback] Body:\n", body)
|
||||
|
||||
19
backend/app/workers/celery_tasks.py
Normal file
19
backend/app/workers/celery_tasks.py
Normal file
@@ -0,0 +1,19 @@
|
||||
import logging
|
||||
|
||||
from celery import shared_task
|
||||
|
||||
logger = logging.getLogger("celery_tasks")
|
||||
if not logger.handlers:
|
||||
_h = logging.StreamHandler()
|
||||
logger.addHandler(_h)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
@shared_task(name="workers.send_email")
|
||||
def send_email(to: str, subject: str, body: str) -> None:
|
||||
if not (to and subject and body):
|
||||
logger.error("Email task missing fields. to=%r subject=%r body_len=%r", to, subject, len(body) if body else 0)
|
||||
return
|
||||
|
||||
# Placeholder for real email sending logic
|
||||
logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body))
|
||||
@@ -2,14 +2,20 @@ aio-pika==9.5.6
|
||||
aiormq==6.8.1
|
||||
aiosqlite==0.21.0
|
||||
alembic==1.16.5
|
||||
amqp==5.3.1
|
||||
annotated-types==0.7.0
|
||||
anyio==4.11.0
|
||||
argon2-cffi==23.1.0
|
||||
argon2-cffi-bindings==25.1.0
|
||||
asyncmy==0.2.9
|
||||
bcrypt==4.3.0
|
||||
billiard==4.2.2
|
||||
celery==5.5.3
|
||||
cffi==2.0.0
|
||||
click==8.1.8
|
||||
click-didyoumean==0.3.1
|
||||
click-plugins==1.1.1.2
|
||||
click-repl==0.3.0
|
||||
cryptography==46.0.1
|
||||
dnspython==2.7.0
|
||||
email_validator==2.2.0
|
||||
@@ -21,11 +27,14 @@ greenlet==3.2.4
|
||||
h11==0.16.0
|
||||
httptools==0.6.4
|
||||
idna==3.10
|
||||
kombu==5.5.4
|
||||
makefun==1.16.0
|
||||
Mako==1.3.10
|
||||
MarkupSafe==3.0.2
|
||||
multidict==6.6.4
|
||||
packaging==25.0
|
||||
pamqp==3.3.0
|
||||
prompt_toolkit==3.0.52
|
||||
propcache==0.3.2
|
||||
pwdlib==0.2.1
|
||||
pycparser==2.23
|
||||
@@ -33,17 +42,22 @@ pydantic==2.11.9
|
||||
pydantic_core==2.33.2
|
||||
PyJWT==2.10.1
|
||||
PyMySQL==1.1.2
|
||||
python-dateutil==2.9.0.post0
|
||||
python-dotenv==1.1.1
|
||||
python-multipart==0.0.20
|
||||
PyYAML==6.0.2
|
||||
six==1.17.0
|
||||
sniffio==1.3.1
|
||||
SQLAlchemy==2.0.43
|
||||
starlette==0.48.0
|
||||
tomli==2.2.1
|
||||
typing-inspection==0.4.1
|
||||
typing_extensions==4.15.0
|
||||
tzdata==2025.2
|
||||
uvicorn==0.37.0
|
||||
uvloop==0.21.0
|
||||
vine==5.1.0
|
||||
watchfiles==1.1.0
|
||||
wcwidth==0.2.14
|
||||
websockets==15.0.1
|
||||
yarl==1.20.1
|
||||
|
||||
@@ -1,56 +0,0 @@
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Dict
|
||||
|
||||
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
|
||||
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
|
||||
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
|
||||
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
|
||||
f"{os.getenv('RABBITMQ_PORT', '5672')}"
|
||||
)
|
||||
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
|
||||
|
||||
|
||||
async def handle_message(message_body: bytes) -> None:
|
||||
try:
|
||||
data: Dict[str, Any] = json.loads(message_body.decode("utf-8"))
|
||||
except Exception as e:
|
||||
print(f"[email_worker] Failed to decode message: {e}")
|
||||
return
|
||||
|
||||
if data.get("type") != "email":
|
||||
print(f"[email_worker] Unknown message type: {data}")
|
||||
return
|
||||
|
||||
to = data.get("to")
|
||||
subject = data.get("subject")
|
||||
body = data.get("body")
|
||||
if not (to and subject and body):
|
||||
print(f"[email_worker] Incomplete email message: {data}")
|
||||
return
|
||||
|
||||
try:
|
||||
await send_email(to=to, subject=subject, body=body)
|
||||
print(f"[email_worker] Sent email to {to}")
|
||||
except Exception as e:
|
||||
print(f"[email_worker] Error sending email to {to}: {e}")
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
import aio_pika
|
||||
|
||||
print(f"[email_worker] Connecting to RabbitMQ at {RABBITMQ_URL}")
|
||||
connection = await aio_pika.connect_robust(RABBITMQ_URL)
|
||||
channel = await connection.channel()
|
||||
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
|
||||
print(f"[email_worker] Waiting for messages in queue '{QUEUE_NAME}' ...")
|
||||
|
||||
async with queue.iterator() as queue_iter:
|
||||
async for message in queue_iter:
|
||||
async with message.process(requeue=False):
|
||||
await handle_message(message.body)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -17,8 +17,14 @@ spec:
|
||||
- image: lukastrkan/cc-app-demo@sha256:d320eefb9dee05dc0f0ec5a2ca90daae7ca8c2af0088dc6b88eee076486c0f3b
|
||||
name: app-demo-worker
|
||||
command:
|
||||
- python3
|
||||
- worker/email_worker.py
|
||||
- celery
|
||||
- -A
|
||||
- app.celery_app
|
||||
- worker
|
||||
- -Q
|
||||
- $(MAIL_QUEUE)
|
||||
- --loglevel
|
||||
- INFO
|
||||
env:
|
||||
- name: RABBITMQ_USERNAME
|
||||
value: demo-app
|
||||
|
||||
Reference in New Issue
Block a user