import asyncio import json import os from typing import Any, Dict RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" f"{os.getenv('RABBITMQ_HOST', 'localhost')}:" f"{os.getenv('RABBITMQ_PORT', '5672')}" ) QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") async def handle_message(message_body: bytes) -> None: try: data: Dict[str, Any] = json.loads(message_body.decode("utf-8")) except Exception as e: print(f"[email_worker] Failed to decode message: {e}") return if data.get("type") != "email": print(f"[email_worker] Unknown message type: {data}") return to = data.get("to") subject = data.get("subject") body = data.get("body") if not (to and subject and body): print(f"[email_worker] Incomplete email message: {data}") return try: await send_email(to=to, subject=subject, body=body) print(f"[email_worker] Sent email to {to}") except Exception as e: print(f"[email_worker] Error sending email to {to}: {e}") async def main() -> None: import aio_pika print(f"[email_worker] Connecting to RabbitMQ at {RABBITMQ_URL}") connection = await aio_pika.connect_robust(RABBITMQ_URL) channel = await connection.channel() queue = await channel.declare_queue(QUEUE_NAME, durable=True) print(f"[email_worker] Waiting for messages in queue '{QUEUE_NAME}' ...") async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(requeue=False): await handle_message(message.body) if __name__ == "__main__": asyncio.run(main())