import os from celery import Celery if os.getenv("RABBITMQ_URL"): RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore else: from urllib.parse import quote username = os.getenv("RABBITMQ_USERNAME", "user") password = os.getenv("RABBITMQ_PASSWORD", "bitnami123") host = os.getenv("RABBITMQ_HOST", "localhost") port = os.getenv("RABBITMQ_PORT", "5672") vhost = os.getenv("RABBITMQ_VHOST", "/") use_ssl = os.getenv("RABBITMQ_USE_SSL", "0").lower() in {"1", "true", "yes"} scheme = "amqps" if use_ssl else "amqp" # Kombu uses '//' to denote the default '/' vhost. For custom vhosts, URL-encode them. if vhost in ("/", ""): vhost_path = "/" # will become '//' after concatenation below else: vhost_path = f"/{quote(vhost, safe='')}" # Ensure we end up with e.g. amqp://user:pass@host:5672// (for '/') RABBITMQ_URL = f"{scheme}://{username}:{password}@{host}:{port}{vhost_path}" if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"): RABBITMQ_URL += "/" DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue") CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://") celery_app = Celery( "app", broker=RABBITMQ_URL, # backend=CELERY_BACKEND, ) celery_app.autodiscover_tasks(["app.workers"], related_name="celery_tasks") # discover app.workers.celery_tasks celery_app.set_default() celery_app.conf.update( task_default_queue=DEFAULT_QUEUE, task_acks_late=True, worker_prefetch_multiplier=int(os.getenv("CELERY_PREFETCH", "1")), task_serializer="json", result_serializer="json", accept_content=["json"], ) __all__ = ["celery_app"]