mirror of
https://github.com/dat515-2025/Group-8.git
synced 2026-03-22 15:12:08 +01:00
Compare commits
10 Commits
merge/basi
...
d908a0843a
| Author | SHA1 | Date | |
|---|---|---|---|
| d908a0843a | |||
| 145565b542 | |||
| 9a436d3c70 | |||
| 49efd88f29 | |||
| 3e809782a6 | |||
| 7cd96c830d | |||
| 233a331cba | |||
| a0bc94d7ec | |||
| e31ec199c0 | |||
| 6d8b760a7d |
50
backend/app/celery_app.py
Normal file
50
backend/app/celery_app.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
import os
|
||||||
|
from celery import Celery
|
||||||
|
|
||||||
|
if os.getenv("RABBITMQ_URL"):
|
||||||
|
RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore
|
||||||
|
else:
|
||||||
|
from urllib.parse import quote
|
||||||
|
|
||||||
|
username = os.getenv("RABBITMQ_USERNAME", "user")
|
||||||
|
password = os.getenv("RABBITMQ_PASSWORD", "bitnami123")
|
||||||
|
host = os.getenv("RABBITMQ_HOST", "localhost")
|
||||||
|
port = os.getenv("RABBITMQ_PORT", "5672")
|
||||||
|
vhost = os.getenv("RABBITMQ_VHOST", "/")
|
||||||
|
use_ssl = os.getenv("RABBITMQ_USE_SSL", "0").lower() in {"1", "true", "yes"}
|
||||||
|
scheme = "amqps" if use_ssl else "amqp"
|
||||||
|
|
||||||
|
# Kombu uses '//' to denote the default '/' vhost. For custom vhosts, URL-encode them.
|
||||||
|
if vhost in ("/", ""):
|
||||||
|
vhost_path = "/" # will become '//' after concatenation below
|
||||||
|
else:
|
||||||
|
vhost_path = f"/{quote(vhost, safe='')}"
|
||||||
|
|
||||||
|
# Ensure we end up with e.g. amqp://user:pass@host:5672// (for '/')
|
||||||
|
RABBITMQ_URL = f"{scheme}://{username}:{password}@{host}:{port}{vhost_path}"
|
||||||
|
if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"):
|
||||||
|
RABBITMQ_URL += "/"
|
||||||
|
|
||||||
|
DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue")
|
||||||
|
|
||||||
|
CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://")
|
||||||
|
|
||||||
|
celery_app = Celery(
|
||||||
|
"app",
|
||||||
|
broker=RABBITMQ_URL,
|
||||||
|
# backend=CELERY_BACKEND,
|
||||||
|
)
|
||||||
|
celery_app.autodiscover_tasks(["app.workers"], related_name="celery_tasks") # discover app.workers.celery_tasks
|
||||||
|
|
||||||
|
celery_app.set_default()
|
||||||
|
|
||||||
|
celery_app.conf.update(
|
||||||
|
task_default_queue=DEFAULT_QUEUE,
|
||||||
|
task_acks_late=True,
|
||||||
|
worker_prefetch_multiplier=int(os.getenv("CELERY_PREFETCH", "1")),
|
||||||
|
task_serializer="json",
|
||||||
|
result_serializer="json",
|
||||||
|
accept_content=["json"],
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = ["celery_app"]
|
||||||
@@ -1,35 +1,6 @@
|
|||||||
import json
|
import app.celery_app # noqa: F401
|
||||||
import os
|
from app.workers.celery_tasks import send_email
|
||||||
from typing import Any, Dict
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
|
|
||||||
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
|
|
||||||
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
|
|
||||||
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
|
|
||||||
f"{os.getenv('RABBITMQ_PORT', '5672')}"
|
|
||||||
)
|
|
||||||
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
|
|
||||||
|
|
||||||
async def _publish_async(message: Dict[str, Any]) -> None:
|
|
||||||
import aio_pika
|
|
||||||
connection = await aio_pika.connect_robust(RABBITMQ_URL)
|
|
||||||
try:
|
|
||||||
channel = await connection.channel()
|
|
||||||
await channel.declare_queue(QUEUE_NAME, durable=True)
|
|
||||||
body = json.dumps(message).encode("utf-8")
|
|
||||||
await channel.default_exchange.publish(
|
|
||||||
aio_pika.Message(body=body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
|
|
||||||
routing_key=QUEUE_NAME,
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
await connection.close()
|
|
||||||
|
|
||||||
def enqueue_email(to: str, subject: str, body: str) -> None:
|
def enqueue_email(to: str, subject: str, body: str) -> None:
|
||||||
message = {"type": "email", "to": to, "subject": subject, "body": body}
|
send_email.delay(to, subject, body)
|
||||||
try:
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
loop.create_task(_publish_async(message))
|
|
||||||
except RuntimeError:
|
|
||||||
asyncio.run(_publish_async(message))
|
|
||||||
|
|
||||||
|
|||||||
@@ -7,8 +7,8 @@ from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin
|
|||||||
from fastapi_users.authentication import (
|
from fastapi_users.authentication import (
|
||||||
AuthenticationBackend,
|
AuthenticationBackend,
|
||||||
BearerTransport,
|
BearerTransport,
|
||||||
JWTStrategy,
|
|
||||||
)
|
)
|
||||||
|
from fastapi_users.authentication.strategy.jwt import JWTStrategy
|
||||||
from fastapi_users.db import SQLAlchemyUserDatabase
|
from fastapi_users.db import SQLAlchemyUserDatabase
|
||||||
|
|
||||||
from app.models.user import User
|
from app.models.user import User
|
||||||
@@ -47,7 +47,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
|
|||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
enqueue_email(to=user.email, subject=subject, body=body)
|
enqueue_email(to=user.email, subject=subject, body=body)
|
||||||
except Exception:
|
except Exception as e:
|
||||||
print("[Email Fallback] To:", user.email)
|
print("[Email Fallback] To:", user.email)
|
||||||
print("[Email Fallback] Subject:", subject)
|
print("[Email Fallback] Subject:", subject)
|
||||||
print("[Email Fallback] Body:\n", body)
|
print("[Email Fallback] Body:\n", body)
|
||||||
|
|||||||
19
backend/app/workers/celery_tasks.py
Normal file
19
backend/app/workers/celery_tasks.py
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from celery import shared_task
|
||||||
|
|
||||||
|
logger = logging.getLogger("celery_tasks")
|
||||||
|
if not logger.handlers:
|
||||||
|
_h = logging.StreamHandler()
|
||||||
|
logger.addHandler(_h)
|
||||||
|
logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
|
||||||
|
@shared_task(name="workers.send_email")
|
||||||
|
def send_email(to: str, subject: str, body: str) -> None:
|
||||||
|
if not (to and subject and body):
|
||||||
|
logger.error("Email task missing fields. to=%r subject=%r body_len=%r", to, subject, len(body) if body else 0)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Placeholder for real email sending logic
|
||||||
|
logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body))
|
||||||
@@ -2,14 +2,20 @@ aio-pika==9.5.6
|
|||||||
aiormq==6.8.1
|
aiormq==6.8.1
|
||||||
aiosqlite==0.21.0
|
aiosqlite==0.21.0
|
||||||
alembic==1.16.5
|
alembic==1.16.5
|
||||||
|
amqp==5.3.1
|
||||||
annotated-types==0.7.0
|
annotated-types==0.7.0
|
||||||
anyio==4.11.0
|
anyio==4.11.0
|
||||||
argon2-cffi==23.1.0
|
argon2-cffi==23.1.0
|
||||||
argon2-cffi-bindings==25.1.0
|
argon2-cffi-bindings==25.1.0
|
||||||
asyncmy==0.2.9
|
asyncmy==0.2.9
|
||||||
bcrypt==4.3.0
|
bcrypt==4.3.0
|
||||||
|
billiard==4.2.2
|
||||||
|
celery==5.5.3
|
||||||
cffi==2.0.0
|
cffi==2.0.0
|
||||||
click==8.1.8
|
click==8.1.8
|
||||||
|
click-didyoumean==0.3.1
|
||||||
|
click-plugins==1.1.1.2
|
||||||
|
click-repl==0.3.0
|
||||||
cryptography==46.0.1
|
cryptography==46.0.1
|
||||||
dnspython==2.7.0
|
dnspython==2.7.0
|
||||||
email_validator==2.2.0
|
email_validator==2.2.0
|
||||||
@@ -21,11 +27,14 @@ greenlet==3.2.4
|
|||||||
h11==0.16.0
|
h11==0.16.0
|
||||||
httptools==0.6.4
|
httptools==0.6.4
|
||||||
idna==3.10
|
idna==3.10
|
||||||
|
kombu==5.5.4
|
||||||
makefun==1.16.0
|
makefun==1.16.0
|
||||||
Mako==1.3.10
|
Mako==1.3.10
|
||||||
MarkupSafe==3.0.2
|
MarkupSafe==3.0.2
|
||||||
multidict==6.6.4
|
multidict==6.6.4
|
||||||
|
packaging==25.0
|
||||||
pamqp==3.3.0
|
pamqp==3.3.0
|
||||||
|
prompt_toolkit==3.0.52
|
||||||
propcache==0.3.2
|
propcache==0.3.2
|
||||||
pwdlib==0.2.1
|
pwdlib==0.2.1
|
||||||
pycparser==2.23
|
pycparser==2.23
|
||||||
@@ -33,17 +42,22 @@ pydantic==2.11.9
|
|||||||
pydantic_core==2.33.2
|
pydantic_core==2.33.2
|
||||||
PyJWT==2.10.1
|
PyJWT==2.10.1
|
||||||
PyMySQL==1.1.2
|
PyMySQL==1.1.2
|
||||||
|
python-dateutil==2.9.0.post0
|
||||||
python-dotenv==1.1.1
|
python-dotenv==1.1.1
|
||||||
python-multipart==0.0.20
|
python-multipart==0.0.20
|
||||||
PyYAML==6.0.2
|
PyYAML==6.0.2
|
||||||
|
six==1.17.0
|
||||||
sniffio==1.3.1
|
sniffio==1.3.1
|
||||||
SQLAlchemy==2.0.43
|
SQLAlchemy==2.0.43
|
||||||
starlette==0.48.0
|
starlette==0.48.0
|
||||||
tomli==2.2.1
|
tomli==2.2.1
|
||||||
typing-inspection==0.4.1
|
typing-inspection==0.4.1
|
||||||
typing_extensions==4.15.0
|
typing_extensions==4.15.0
|
||||||
|
tzdata==2025.2
|
||||||
uvicorn==0.37.0
|
uvicorn==0.37.0
|
||||||
uvloop==0.21.0
|
uvloop==0.21.0
|
||||||
|
vine==5.1.0
|
||||||
watchfiles==1.1.0
|
watchfiles==1.1.0
|
||||||
|
wcwidth==0.2.14
|
||||||
websockets==15.0.1
|
websockets==15.0.1
|
||||||
yarl==1.20.1
|
yarl==1.20.1
|
||||||
|
|||||||
@@ -1,56 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
from typing import Any, Dict
|
|
||||||
|
|
||||||
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
|
|
||||||
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
|
|
||||||
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
|
|
||||||
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
|
|
||||||
f"{os.getenv('RABBITMQ_PORT', '5672')}"
|
|
||||||
)
|
|
||||||
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
|
|
||||||
|
|
||||||
|
|
||||||
async def handle_message(message_body: bytes) -> None:
|
|
||||||
try:
|
|
||||||
data: Dict[str, Any] = json.loads(message_body.decode("utf-8"))
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[email_worker] Failed to decode message: {e}")
|
|
||||||
return
|
|
||||||
|
|
||||||
if data.get("type") != "email":
|
|
||||||
print(f"[email_worker] Unknown message type: {data}")
|
|
||||||
return
|
|
||||||
|
|
||||||
to = data.get("to")
|
|
||||||
subject = data.get("subject")
|
|
||||||
body = data.get("body")
|
|
||||||
if not (to and subject and body):
|
|
||||||
print(f"[email_worker] Incomplete email message: {data}")
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
await send_email(to=to, subject=subject, body=body)
|
|
||||||
print(f"[email_worker] Sent email to {to}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[email_worker] Error sending email to {to}: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
async def main() -> None:
|
|
||||||
import aio_pika
|
|
||||||
|
|
||||||
print(f"[email_worker] Connecting to RabbitMQ at {RABBITMQ_URL}")
|
|
||||||
connection = await aio_pika.connect_robust(RABBITMQ_URL)
|
|
||||||
channel = await connection.channel()
|
|
||||||
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
|
|
||||||
print(f"[email_worker] Waiting for messages in queue '{QUEUE_NAME}' ...")
|
|
||||||
|
|
||||||
async with queue.iterator() as queue_iter:
|
|
||||||
async for message in queue_iter:
|
|
||||||
async with message.process(requeue=False):
|
|
||||||
await handle_message(message.body)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
asyncio.run(main())
|
|
||||||
@@ -17,8 +17,14 @@ spec:
|
|||||||
- image: lukastrkan/cc-app-demo@sha256:75634b4d97282b6b8424fe17767c81adf44af5f7359c1d25883073b5629b3e05
|
- image: lukastrkan/cc-app-demo@sha256:75634b4d97282b6b8424fe17767c81adf44af5f7359c1d25883073b5629b3e05
|
||||||
name: app-demo-worker
|
name: app-demo-worker
|
||||||
command:
|
command:
|
||||||
- python3
|
- celery
|
||||||
- worker/email_worker.py
|
- -A
|
||||||
|
- app.celery_app
|
||||||
|
- worker
|
||||||
|
- -Q
|
||||||
|
- $(MAIL_QUEUE)
|
||||||
|
- --loglevel
|
||||||
|
- INFO
|
||||||
env:
|
env:
|
||||||
- name: RABBITMQ_USERNAME
|
- name: RABBITMQ_USERNAME
|
||||||
value: demo-app
|
value: demo-app
|
||||||
|
|||||||
Reference in New Issue
Block a user