Files
uis-cloud-computing/backend/app/queue.py

48 lines
1.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
from typing import Any, Dict
import asyncio
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
f"{os.getenv('RABBITMQ_PORT', '5672')}"
)
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
async def _publish_async(message: Dict[str, Any]) -> None:
# Import locally to avoid hard dependency at import-time
import aio_pika
connection = await aio_pika.connect_robust(RABBITMQ_URL)
try:
channel = await connection.channel()
await channel.declare_queue(QUEUE_NAME, durable=True)
body = json.dumps(message).encode("utf-8")
await channel.default_exchange.publish(
aio_pika.Message(body=body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
routing_key=QUEUE_NAME,
)
finally:
await connection.close()
def enqueue_email(to: str, subject: str, body: str) -> None:
"""
Enqueue an email to RabbitMQ. If RabbitMQ or aio_pika is not available,
this function will raise ImportError/ConnectionError. The caller may
implement fallback (e.g., direct send).
"""
message = {"type": "email", "to": to, "subject": subject, "body": body}
try:
loop = asyncio.get_running_loop()
# Fire-and-forget task so we don't block the request path
loop.create_task(_publish_async(message))
except RuntimeError:
# No running loop (e.g., called from sync context) run a short loop
asyncio.run(_publish_async(message))