mirror of
https://github.com/dat515-2025/Group-8.git
synced 2026-03-22 15:12:08 +01:00
58 lines
1.7 KiB
Python
58 lines
1.7 KiB
Python
import asyncio
|
|
import json
|
|
import os
|
|
from typing import Any, Dict
|
|
|
|
|
|
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
|
|
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
|
|
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
|
|
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
|
|
f"{os.getenv('RABBITMQ_PORT', '5672')}"
|
|
)
|
|
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
|
|
|
|
|
|
async def handle_message(message_body: bytes) -> None:
|
|
try:
|
|
data: Dict[str, Any] = json.loads(message_body.decode("utf-8"))
|
|
except Exception as e:
|
|
print(f"[email_worker] Failed to decode message: {e}")
|
|
return
|
|
|
|
if data.get("type") != "email":
|
|
print(f"[email_worker] Unknown message type: {data}")
|
|
return
|
|
|
|
to = data.get("to")
|
|
subject = data.get("subject")
|
|
body = data.get("body")
|
|
if not (to and subject and body):
|
|
print(f"[email_worker] Incomplete email message: {data}")
|
|
return
|
|
|
|
try:
|
|
await send_email(to=to, subject=subject, body=body)
|
|
print(f"[email_worker] Sent email to {to}")
|
|
except Exception as e:
|
|
print(f"[email_worker] Error sending email to {to}: {e}")
|
|
|
|
|
|
async def main() -> None:
|
|
import aio_pika
|
|
|
|
print(f"[email_worker] Connecting to RabbitMQ at {RABBITMQ_URL}")
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URL)
|
|
channel = await connection.channel()
|
|
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
|
|
print(f"[email_worker] Waiting for messages in queue '{QUEUE_NAME}' ...")
|
|
|
|
async with queue.iterator() as queue_iter:
|
|
async for message in queue_iter:
|
|
async with message.process(requeue=False):
|
|
await handle_message(message.body)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|