mirror of
https://github.com/dat515-2025/Group-8.git
synced 2026-03-22 23:20:56 +01:00
48 lines
1.6 KiB
Python
48 lines
1.6 KiB
Python
import json
|
||
import os
|
||
from typing import Any, Dict
|
||
|
||
import asyncio
|
||
|
||
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
|
||
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
|
||
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
|
||
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
|
||
f"{os.getenv('RABBITMQ_PORT', '5672')}"
|
||
)
|
||
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
|
||
|
||
|
||
async def _publish_async(message: Dict[str, Any]) -> None:
|
||
# Import locally to avoid hard dependency at import-time
|
||
import aio_pika
|
||
|
||
connection = await aio_pika.connect_robust(RABBITMQ_URL)
|
||
try:
|
||
channel = await connection.channel()
|
||
await channel.declare_queue(QUEUE_NAME, durable=True)
|
||
body = json.dumps(message).encode("utf-8")
|
||
await channel.default_exchange.publish(
|
||
aio_pika.Message(body=body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
|
||
routing_key=QUEUE_NAME,
|
||
)
|
||
finally:
|
||
await connection.close()
|
||
|
||
|
||
def enqueue_email(to: str, subject: str, body: str) -> None:
|
||
"""
|
||
Enqueue an email to RabbitMQ. If RabbitMQ or aio_pika is not available,
|
||
this function will raise ImportError/ConnectionError. The caller may
|
||
implement fallback (e.g., direct send).
|
||
"""
|
||
message = {"type": "email", "to": to, "subject": subject, "body": body}
|
||
|
||
try:
|
||
loop = asyncio.get_running_loop()
|
||
# Fire-and-forget task so we don't block the request path
|
||
loop.create_task(_publish_async(message))
|
||
except RuntimeError:
|
||
# No running loop (e.g., called from sync context) – run a short loop
|
||
asyncio.run(_publish_async(message))
|