From 6d8b760a7daff0890ab9cfd5a69a65af7ba092bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Trkan?= Date: Thu, 2 Oct 2025 13:00:57 +0200 Subject: [PATCH 1/9] feat(infrastructure): update queue worker --- backend/app/workers/email_handler.py | 29 +++ backend/app/workers/queue_worker.py | 202 +++++++++++++++++++++ backend/worker/__init__.py | 0 backend/worker/email_worker.py | 56 ------ deployment/app-demo-worker-deployment.yaml | 2 +- 5 files changed, 232 insertions(+), 57 deletions(-) create mode 100644 backend/app/workers/email_handler.py create mode 100644 backend/app/workers/queue_worker.py delete mode 100644 backend/worker/__init__.py delete mode 100644 backend/worker/email_worker.py diff --git a/backend/app/workers/email_handler.py b/backend/app/workers/email_handler.py new file mode 100644 index 0000000..cad8797 --- /dev/null +++ b/backend/app/workers/email_handler.py @@ -0,0 +1,29 @@ +import asyncio +from typing import Any, Dict + +# Import decorator and logger from the worker so handlers can register themselves +from .queue_worker import register_task_handler, logger + + +@register_task_handler("email") +async def handle_email_task(payload: Dict[str, Any]) -> None: + """Handle 'email' tasks dispatched by the queue worker. + + Expected payload schema: + { + "type": "email", + "to": "recipient@example.com", + "subject": "Subject text", + "body": "Email body text" + } + """ + to = payload.get("to") + subject = payload.get("subject") + body = payload.get("body") + if not (to and subject and body): + logger.error("Email task missing fields. Payload: %s", payload) + return + + # Placeholder for real email sending logic + await asyncio.sleep(0) # yield control, simulate async work + logger.info("Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body)) diff --git a/backend/app/workers/queue_worker.py b/backend/app/workers/queue_worker.py new file mode 100644 index 0000000..6dbb7ad --- /dev/null +++ b/backend/app/workers/queue_worker.py @@ -0,0 +1,202 @@ +import asyncio +import json +import logging +import os +import signal +import socket +from contextlib import suppress +from typing import Any, Awaitable, Callable, Dict + +# Build RabbitMQ URL consistent with core.queue +RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( + f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" + f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" + f"{os.getenv('RABBITMQ_HOST', 'localhost')}:" + f"{os.getenv('RABBITMQ_PORT', '5672')}" +) +QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") + +# Logging configuration +logger = logging.getLogger("queue_worker") +logger.propagate = False # prevent double logging via root logger +if not logger.handlers: # guard against multiple imports creating duplicate handlers + _handler = logging.StreamHandler() + _formatter = logging.Formatter( + fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + _handler.setFormatter(_formatter) + logger.addHandler(_handler) +logger.setLevel(os.getenv("WORKER_LOG_LEVEL", "INFO").upper()) + +# Task handler registry +TaskHandler = Callable[[Dict[str, Any]], Awaitable[None]] +_TASK_HANDLERS: Dict[str, TaskHandler] = {} + +def register_task_handler(task_type: str) -> Callable[[TaskHandler], TaskHandler]: + def decorator(func: TaskHandler) -> TaskHandler: + _TASK_HANDLERS[task_type] = func + logger.debug("Registered task handler for type='%s' -> %s", task_type, func.__name__) + return func + return decorator + +async def _dispatch(message_dict: Dict[str, Any]) -> None: + task_type = message_dict.get("type") + if not task_type: + logger.warning("Skipping message without 'type': %s", message_dict) + return + handler = _TASK_HANDLERS.get(task_type) + if not handler: + logger.warning("No handler for type='%s'. Message: %s", task_type, message_dict) + return + await handler(message_dict) + +# Auto-load all handler modules in this package so decorators can register +# without manual imports. Any file matching '*_handler.py' will be imported. + +def _auto_import_handlers() -> None: + import importlib + import os + import pkgutil + import sys + + pkg_name = __package__ # e.g., 'backend.app.workers' when run as module + dir_path = os.path.dirname(__file__) + + # Discover handler module base names in the filesystem + handler_modules = [] + for finder, name, ispkg in pkgutil.iter_modules([dir_path]): + if not ispkg and name.endswith("_handler"): + handler_modules.append(name) + + if not handler_modules: + logger.info("No handler modules discovered in %s", dir_path) + return + + imported = 0 + + # Prefer importing using the current package name so relative imports in + # handler modules (e.g., from .queue_worker import register_task_handler) + # continue to work. + bases_to_try = [] + if pkg_name: + bases_to_try.append(pkg_name) + + # If executed as a script (no package), make 'app' importable and alias this + # module as 'app.workers.queue_worker' so that handler relative imports work + if not pkg_name: + backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # .../backend + if backend_dir not in sys.path: + sys.path.insert(0, backend_dir) + try: + importlib.import_module("app.workers") + # Alias the current module under the package name to satisfy + # 'from .queue_worker import ...' inside handlers without re-importing + sys.modules.setdefault("app.workers.queue_worker", sys.modules[__name__]) + except Exception as e: + logger.debug("Failed to prepare package context for handlers: %s", e) + # Ensure we try importing under 'app.workers' + if "app.workers" not in bases_to_try: + bases_to_try.append("app.workers") + + # Also try common alternative base when the whole project is a package + for base in ("backend.app.workers", "app.workers"): + if base not in bases_to_try: + bases_to_try.append(base) + + for base in bases_to_try: + try: + importlib.import_module(base) + except Exception: + continue + for mod in handler_modules: + full_name = f"{base}.{mod}" + try: + importlib.import_module(full_name) + imported += 1 + except Exception as e: + logger.debug("Failed to import handler module %s: %s", full_name, e) + if imported: + break + + if not imported: + logger.warning( + "Failed to import any handler modules. Ensure the worker is executed " + "as a module (e.g., 'python -m backend.app.workers.queue_worker')." + ) + else: + logger.info("Imported %d handler module(s): %s", imported, ", ".join(handler_modules)) + +# Trigger auto-discovery of handlers at import time +_auto_import_handlers() + +async def _consume(loop: asyncio.AbstractEventLoop) -> None: + import aio_pika + + # Mask password in URL for logging + safe_url = RABBITMQ_URL + with suppress(Exception): + if "@" in RABBITMQ_URL and "://" in RABBITMQ_URL: + scheme, rest = RABBITMQ_URL.split("://", 1) + creds_host = rest.split("@") + if len(creds_host) == 2: + user_pass, host = creds_host + user = user_pass.split(":")[0] + safe_url = f"{scheme}://{user}:***@{host}" + + logger.info( + "Starting worker | pid=%s | host=%s | queue=%s | url=%s", + os.getpid(), socket.gethostname(), QUEUE_NAME, safe_url, + ) + + connection = await aio_pika.connect_robust(RABBITMQ_URL, loop=loop) + try: + channel = await connection.channel() + queue = await channel.declare_queue(QUEUE_NAME, durable=True) + logger.info("Waiting for messages on queue='%s' …", QUEUE_NAME) + + async with queue.iterator() as queue_iter: + async for message in queue_iter: + async with message.process(requeue=False): + try: + payload = json.loads(message.body.decode("utf-8")) + except Exception as e: + logger.exception("Failed to decode message: %s", e) + continue + try: + await _dispatch(payload) + except Exception: + logger.exception("Unhandled error while processing message: %s", payload) + # message is not requeued due to process(requeue=False) + finally: + await connection.close() + logger.info("Worker stopped") + +async def main() -> None: + loop = asyncio.get_running_loop() + + stop_event = asyncio.Event() + + def _request_shutdown(signame: str) -> None: + logger.info("Received %s, shutting down gracefully…", signame) + stop_event.set() + + for sig in (signal.SIGINT, signal.SIGTERM): + with suppress(NotImplementedError): + loop.add_signal_handler(sig, _request_shutdown, sig.name) + + consumer = asyncio.create_task(_consume(loop)) + + # Wait until a stop signal arrives + await stop_event.wait() + + # Give the consumer some time to exit + consumer.cancel() + with suppress(asyncio.CancelledError): + await consumer + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + pass diff --git a/backend/worker/__init__.py b/backend/worker/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/backend/worker/email_worker.py b/backend/worker/email_worker.py deleted file mode 100644 index 5f927b6..0000000 --- a/backend/worker/email_worker.py +++ /dev/null @@ -1,56 +0,0 @@ -import asyncio -import json -import os -from typing import Any, Dict - -RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( - f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" - f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" - f"{os.getenv('RABBITMQ_HOST', 'localhost')}:" - f"{os.getenv('RABBITMQ_PORT', '5672')}" -) -QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") - - -async def handle_message(message_body: bytes) -> None: - try: - data: Dict[str, Any] = json.loads(message_body.decode("utf-8")) - except Exception as e: - print(f"[email_worker] Failed to decode message: {e}") - return - - if data.get("type") != "email": - print(f"[email_worker] Unknown message type: {data}") - return - - to = data.get("to") - subject = data.get("subject") - body = data.get("body") - if not (to and subject and body): - print(f"[email_worker] Incomplete email message: {data}") - return - - try: - await send_email(to=to, subject=subject, body=body) - print(f"[email_worker] Sent email to {to}") - except Exception as e: - print(f"[email_worker] Error sending email to {to}: {e}") - - -async def main() -> None: - import aio_pika - - print(f"[email_worker] Connecting to RabbitMQ at {RABBITMQ_URL}") - connection = await aio_pika.connect_robust(RABBITMQ_URL) - channel = await connection.channel() - queue = await channel.declare_queue(QUEUE_NAME, durable=True) - print(f"[email_worker] Waiting for messages in queue '{QUEUE_NAME}' ...") - - async with queue.iterator() as queue_iter: - async for message in queue_iter: - async with message.process(requeue=False): - await handle_message(message.body) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/deployment/app-demo-worker-deployment.yaml b/deployment/app-demo-worker-deployment.yaml index f463a9c..a81cc7a 100644 --- a/deployment/app-demo-worker-deployment.yaml +++ b/deployment/app-demo-worker-deployment.yaml @@ -18,7 +18,7 @@ spec: name: app-demo-worker command: - python3 - - worker/email_worker.py + - app/workers/queue_worker.py env: - name: RABBITMQ_USERNAME value: demo-app From e31ec199c0cc2e6be7c8eedb38c553298630cbb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Trkan?= Date: Thu, 2 Oct 2025 13:04:20 +0200 Subject: [PATCH 2/9] Update backend/app/workers/queue_worker.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend/app/workers/queue_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/app/workers/queue_worker.py b/backend/app/workers/queue_worker.py index 6dbb7ad..ba65f6a 100644 --- a/backend/app/workers/queue_worker.py +++ b/backend/app/workers/queue_worker.py @@ -85,7 +85,7 @@ def _auto_import_handlers() -> None: # If executed as a script (no package), make 'app' importable and alias this # module as 'app.workers.queue_worker' so that handler relative imports work if not pkg_name: - backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # .../backend + backend_dir = os.path.abspath(os.path.join(__file__, '../../..')) # .../backend if backend_dir not in sys.path: sys.path.insert(0, backend_dir) try: From a0bc94d7ecd424b33a367d03238bfb2b8544df8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Trkan?= Date: Thu, 2 Oct 2025 13:04:30 +0200 Subject: [PATCH 3/9] Update backend/app/workers/queue_worker.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend/app/workers/queue_worker.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/backend/app/workers/queue_worker.py b/backend/app/workers/queue_worker.py index ba65f6a..a10b4c4 100644 --- a/backend/app/workers/queue_worker.py +++ b/backend/app/workers/queue_worker.py @@ -82,17 +82,15 @@ def _auto_import_handlers() -> None: if pkg_name: bases_to_try.append(pkg_name) - # If executed as a script (no package), make 'app' importable and alias this - # module as 'app.workers.queue_worker' so that handler relative imports work + # If executed as a script (no package), make 'app' importable so that handler modules + # can be imported using absolute imports. Handler modules must use absolute imports + # (e.g., 'from backend.app.workers.queue_worker import register_task_handler'). if not pkg_name: - backend_dir = os.path.abspath(os.path.join(__file__, '../../..')) # .../backend + backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # .../backend if backend_dir not in sys.path: sys.path.insert(0, backend_dir) try: importlib.import_module("app.workers") - # Alias the current module under the package name to satisfy - # 'from .queue_worker import ...' inside handlers without re-importing - sys.modules.setdefault("app.workers.queue_worker", sys.modules[__name__]) except Exception as e: logger.debug("Failed to prepare package context for handlers: %s", e) # Ensure we try importing under 'app.workers' From 233a331cba8f72c668fe35a88b13ae6d2678b1aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Trkan?= Date: Thu, 2 Oct 2025 13:04:37 +0200 Subject: [PATCH 4/9] Update backend/app/workers/queue_worker.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend/app/workers/queue_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/app/workers/queue_worker.py b/backend/app/workers/queue_worker.py index a10b4c4..887404c 100644 --- a/backend/app/workers/queue_worker.py +++ b/backend/app/workers/queue_worker.py @@ -120,7 +120,7 @@ def _auto_import_handlers() -> None: if not imported: logger.warning( "Failed to import any handler modules. Ensure the worker is executed " - "as a module (e.g., 'python -m backend.app.workers.queue_worker')." + "as a module (e.g., 'python -m backend.app.workers.queue_worker') or as a script (e.g., 'python3 app/workers/queue_worker.py')." ) else: logger.info("Imported %d handler module(s): %s", imported, ", ".join(handler_modules)) From 7cd96c830d941fe452bf3ef2357df7c978952238 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Trkan?= Date: Thu, 2 Oct 2025 13:08:59 +0200 Subject: [PATCH 5/9] feat(infrastructure): update queue worker --- backend/app/workers/queue_worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/app/workers/queue_worker.py b/backend/app/workers/queue_worker.py index 887404c..48c7078 100644 --- a/backend/app/workers/queue_worker.py +++ b/backend/app/workers/queue_worker.py @@ -91,6 +91,7 @@ def _auto_import_handlers() -> None: sys.path.insert(0, backend_dir) try: importlib.import_module("app.workers") + sys.modules.setdefault("app.workers.queue_worker", sys.modules[__name__]) except Exception as e: logger.debug("Failed to prepare package context for handlers: %s", e) # Ensure we try importing under 'app.workers' From 3e809782a6a81d0b1387f2007e89eff0b77ea85e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Trkan?= Date: Thu, 2 Oct 2025 13:59:01 +0200 Subject: [PATCH 6/9] feat(infrastructure): update queue worker --- backend/app/workers/__main__.py | 0 backend/app/workers/queue_worker.py | 66 ++++++---------------- deployment/app-demo-worker-deployment.yaml | 4 +- 3 files changed, 18 insertions(+), 52 deletions(-) create mode 100644 backend/app/workers/__main__.py diff --git a/backend/app/workers/__main__.py b/backend/app/workers/__main__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/workers/queue_worker.py b/backend/app/workers/queue_worker.py index 48c7078..abd769f 100644 --- a/backend/app/workers/queue_worker.py +++ b/backend/app/workers/queue_worker.py @@ -7,7 +7,6 @@ import socket from contextlib import suppress from typing import Any, Awaitable, Callable, Dict -# Build RabbitMQ URL consistent with core.queue RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" @@ -16,10 +15,9 @@ RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( ) QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") -# Logging configuration logger = logging.getLogger("queue_worker") -logger.propagate = False # prevent double logging via root logger -if not logger.handlers: # guard against multiple imports creating duplicate handlers +logger.propagate = False # prevent double logging +if not logger.handlers: # do not duplicate handlers _handler = logging.StreamHandler() _formatter = logging.Formatter( fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", @@ -29,17 +27,19 @@ if not logger.handlers: # guard against multiple imports creating duplicate han logger.addHandler(_handler) logger.setLevel(os.getenv("WORKER_LOG_LEVEL", "INFO").upper()) -# Task handler registry TaskHandler = Callable[[Dict[str, Any]], Awaitable[None]] _TASK_HANDLERS: Dict[str, TaskHandler] = {} + def register_task_handler(task_type: str) -> Callable[[TaskHandler], TaskHandler]: def decorator(func: TaskHandler) -> TaskHandler: _TASK_HANDLERS[task_type] = func logger.debug("Registered task handler for type='%s' -> %s", task_type, func.__name__) return func + return decorator + async def _dispatch(message_dict: Dict[str, Any]) -> None: task_type = message_dict.get("type") if not task_type: @@ -51,18 +51,17 @@ async def _dispatch(message_dict: Dict[str, Any]) -> None: return await handler(message_dict) -# Auto-load all handler modules in this package so decorators can register -# without manual imports. Any file matching '*_handler.py' will be imported. def _auto_import_handlers() -> None: import importlib import os import pkgutil - import sys pkg_name = __package__ # e.g., 'backend.app.workers' when run as module dir_path = os.path.dirname(__file__) + print(f"Package: {pkg_name}, Dir: {dir_path}") + # Discover handler module base names in the filesystem handler_modules = [] for finder, name, ispkg in pkgutil.iter_modules([dir_path]): @@ -75,48 +74,14 @@ def _auto_import_handlers() -> None: imported = 0 - # Prefer importing using the current package name so relative imports in - # handler modules (e.g., from .queue_worker import register_task_handler) - # continue to work. - bases_to_try = [] - if pkg_name: - bases_to_try.append(pkg_name) - - # If executed as a script (no package), make 'app' importable so that handler modules - # can be imported using absolute imports. Handler modules must use absolute imports - # (e.g., 'from backend.app.workers.queue_worker import register_task_handler'). - if not pkg_name: - backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # .../backend - if backend_dir not in sys.path: - sys.path.insert(0, backend_dir) + importlib.import_module(__package__) + for mod in handler_modules: + full_name = f"{__package__}.{mod}" try: - importlib.import_module("app.workers") - sys.modules.setdefault("app.workers.queue_worker", sys.modules[__name__]) + importlib.import_module(full_name) + imported += 1 except Exception as e: - logger.debug("Failed to prepare package context for handlers: %s", e) - # Ensure we try importing under 'app.workers' - if "app.workers" not in bases_to_try: - bases_to_try.append("app.workers") - - # Also try common alternative base when the whole project is a package - for base in ("backend.app.workers", "app.workers"): - if base not in bases_to_try: - bases_to_try.append(base) - - for base in bases_to_try: - try: - importlib.import_module(base) - except Exception: - continue - for mod in handler_modules: - full_name = f"{base}.{mod}" - try: - importlib.import_module(full_name) - imported += 1 - except Exception as e: - logger.debug("Failed to import handler module %s: %s", full_name, e) - if imported: - break + logger.debug("Failed to import handler module %s: %s", full_name, e) if not imported: logger.warning( @@ -126,8 +91,6 @@ def _auto_import_handlers() -> None: else: logger.info("Imported %d handler module(s): %s", imported, ", ".join(handler_modules)) -# Trigger auto-discovery of handlers at import time -_auto_import_handlers() async def _consume(loop: asyncio.AbstractEventLoop) -> None: import aio_pika @@ -171,6 +134,7 @@ async def _consume(loop: asyncio.AbstractEventLoop) -> None: await connection.close() logger.info("Worker stopped") + async def main() -> None: loop = asyncio.get_running_loop() @@ -194,7 +158,9 @@ async def main() -> None: with suppress(asyncio.CancelledError): await consumer + if __name__ == "__main__": + _auto_import_handlers() try: asyncio.run(main()) except KeyboardInterrupt: diff --git a/deployment/app-demo-worker-deployment.yaml b/deployment/app-demo-worker-deployment.yaml index a81cc7a..8520408 100644 --- a/deployment/app-demo-worker-deployment.yaml +++ b/deployment/app-demo-worker-deployment.yaml @@ -17,8 +17,8 @@ spec: - image: lukastrkan/cc-app-demo@sha256:d320eefb9dee05dc0f0ec5a2ca90daae7ca8c2af0088dc6b88eee076486c0f3b name: app-demo-worker command: - - python3 - - app/workers/queue_worker.py + - python3 -m + - app.workers.queue_worker env: - name: RABBITMQ_USERNAME value: demo-app From 49efd88f29a8355e070ace9c5d9f7162bcd78211 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Trkan?= Date: Thu, 2 Oct 2025 14:32:07 +0200 Subject: [PATCH 7/9] feat(infrastructure): use celery worker --- backend/app/core/queue.py | 53 +++---- backend/app/services/user_service.py | 4 +- backend/app/workers/__main__.py | 0 backend/app/workers/email_handler.py | 29 ---- backend/app/workers/queue_worker.py | 167 --------------------- backend/requirements.txt | 14 ++ deployment/app-demo-worker-deployment.yaml | 10 +- 7 files changed, 47 insertions(+), 230 deletions(-) delete mode 100644 backend/app/workers/__main__.py delete mode 100644 backend/app/workers/email_handler.py delete mode 100644 backend/app/workers/queue_worker.py diff --git a/backend/app/core/queue.py b/backend/app/core/queue.py index 2aecf9e..f899a1a 100644 --- a/backend/app/core/queue.py +++ b/backend/app/core/queue.py @@ -1,35 +1,28 @@ -import json -import os -from typing import Any, Dict -import asyncio +# Route email jobs via Celery instead of raw RabbitMQ publishing +# Ensure Celery app is initialized so producers use correct broker credentials. +# Import has side effects (sets Celery default app), safe to keep at module level. +import app.celery_app # noqa: F401 -RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( - f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" - f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" - f"{os.getenv('RABBITMQ_HOST', 'localhost')}:" - f"{os.getenv('RABBITMQ_PORT', '5672')}" -) -QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") +# Use a lazy proxy so _send_email_task is never None, even if Celery modules +# are not yet importable at import time (e.g., different working dir during startup). +class _LazySendTask: + def delay(self, to: str, subject: str, body: str): # type: ignore[no-untyped-def] + # Late import on first use + from app.workers.celery_tasks import send_email_fields as _send # type: ignore + return _send.delay(to, subject, body) + + +try: + # Try eager import for normal operation + from app.workers.celery_tasks import send_email_fields as _send_email_task # type: ignore +except ImportError: # pragma: no cover - fallback lazy import proxy + _send_email_task = _LazySendTask() # type: ignore -async def _publish_async(message: Dict[str, Any]) -> None: - import aio_pika - connection = await aio_pika.connect_robust(RABBITMQ_URL) - try: - channel = await connection.channel() - await channel.declare_queue(QUEUE_NAME, durable=True) - body = json.dumps(message).encode("utf-8") - await channel.default_exchange.publish( - aio_pika.Message(body=body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT), - routing_key=QUEUE_NAME, - ) - finally: - await connection.close() def enqueue_email(to: str, subject: str, body: str) -> None: - message = {"type": "email", "to": to, "subject": subject, "body": body} - try: - loop = asyncio.get_running_loop() - loop.create_task(_publish_async(message)) - except RuntimeError: - asyncio.run(_publish_async(message)) + """Enqueue an email job using Celery. + + Keeps the same public API as before. + """ + _send_email_task.delay(to, subject, body) diff --git a/backend/app/services/user_service.py b/backend/app/services/user_service.py index e92d9d3..1d3857e 100644 --- a/backend/app/services/user_service.py +++ b/backend/app/services/user_service.py @@ -7,8 +7,8 @@ from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin from fastapi_users.authentication import ( AuthenticationBackend, BearerTransport, - JWTStrategy, ) +from fastapi_users.authentication.strategy.jwt import JWTStrategy from fastapi_users.db import SQLAlchemyUserDatabase from app.models.user import User @@ -47,7 +47,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]): ) try: enqueue_email(to=user.email, subject=subject, body=body) - except Exception: + except Exception as e: print("[Email Fallback] To:", user.email) print("[Email Fallback] Subject:", subject) print("[Email Fallback] Body:\n", body) diff --git a/backend/app/workers/__main__.py b/backend/app/workers/__main__.py deleted file mode 100644 index e69de29..0000000 diff --git a/backend/app/workers/email_handler.py b/backend/app/workers/email_handler.py deleted file mode 100644 index cad8797..0000000 --- a/backend/app/workers/email_handler.py +++ /dev/null @@ -1,29 +0,0 @@ -import asyncio -from typing import Any, Dict - -# Import decorator and logger from the worker so handlers can register themselves -from .queue_worker import register_task_handler, logger - - -@register_task_handler("email") -async def handle_email_task(payload: Dict[str, Any]) -> None: - """Handle 'email' tasks dispatched by the queue worker. - - Expected payload schema: - { - "type": "email", - "to": "recipient@example.com", - "subject": "Subject text", - "body": "Email body text" - } - """ - to = payload.get("to") - subject = payload.get("subject") - body = payload.get("body") - if not (to and subject and body): - logger.error("Email task missing fields. Payload: %s", payload) - return - - # Placeholder for real email sending logic - await asyncio.sleep(0) # yield control, simulate async work - logger.info("Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body)) diff --git a/backend/app/workers/queue_worker.py b/backend/app/workers/queue_worker.py deleted file mode 100644 index abd769f..0000000 --- a/backend/app/workers/queue_worker.py +++ /dev/null @@ -1,167 +0,0 @@ -import asyncio -import json -import logging -import os -import signal -import socket -from contextlib import suppress -from typing import Any, Awaitable, Callable, Dict - -RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( - f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" - f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" - f"{os.getenv('RABBITMQ_HOST', 'localhost')}:" - f"{os.getenv('RABBITMQ_PORT', '5672')}" -) -QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") - -logger = logging.getLogger("queue_worker") -logger.propagate = False # prevent double logging -if not logger.handlers: # do not duplicate handlers - _handler = logging.StreamHandler() - _formatter = logging.Formatter( - fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - _handler.setFormatter(_formatter) - logger.addHandler(_handler) -logger.setLevel(os.getenv("WORKER_LOG_LEVEL", "INFO").upper()) - -TaskHandler = Callable[[Dict[str, Any]], Awaitable[None]] -_TASK_HANDLERS: Dict[str, TaskHandler] = {} - - -def register_task_handler(task_type: str) -> Callable[[TaskHandler], TaskHandler]: - def decorator(func: TaskHandler) -> TaskHandler: - _TASK_HANDLERS[task_type] = func - logger.debug("Registered task handler for type='%s' -> %s", task_type, func.__name__) - return func - - return decorator - - -async def _dispatch(message_dict: Dict[str, Any]) -> None: - task_type = message_dict.get("type") - if not task_type: - logger.warning("Skipping message without 'type': %s", message_dict) - return - handler = _TASK_HANDLERS.get(task_type) - if not handler: - logger.warning("No handler for type='%s'. Message: %s", task_type, message_dict) - return - await handler(message_dict) - - -def _auto_import_handlers() -> None: - import importlib - import os - import pkgutil - - pkg_name = __package__ # e.g., 'backend.app.workers' when run as module - dir_path = os.path.dirname(__file__) - - print(f"Package: {pkg_name}, Dir: {dir_path}") - - # Discover handler module base names in the filesystem - handler_modules = [] - for finder, name, ispkg in pkgutil.iter_modules([dir_path]): - if not ispkg and name.endswith("_handler"): - handler_modules.append(name) - - if not handler_modules: - logger.info("No handler modules discovered in %s", dir_path) - return - - imported = 0 - - importlib.import_module(__package__) - for mod in handler_modules: - full_name = f"{__package__}.{mod}" - try: - importlib.import_module(full_name) - imported += 1 - except Exception as e: - logger.debug("Failed to import handler module %s: %s", full_name, e) - - if not imported: - logger.warning( - "Failed to import any handler modules. Ensure the worker is executed " - "as a module (e.g., 'python -m backend.app.workers.queue_worker') or as a script (e.g., 'python3 app/workers/queue_worker.py')." - ) - else: - logger.info("Imported %d handler module(s): %s", imported, ", ".join(handler_modules)) - - -async def _consume(loop: asyncio.AbstractEventLoop) -> None: - import aio_pika - - # Mask password in URL for logging - safe_url = RABBITMQ_URL - with suppress(Exception): - if "@" in RABBITMQ_URL and "://" in RABBITMQ_URL: - scheme, rest = RABBITMQ_URL.split("://", 1) - creds_host = rest.split("@") - if len(creds_host) == 2: - user_pass, host = creds_host - user = user_pass.split(":")[0] - safe_url = f"{scheme}://{user}:***@{host}" - - logger.info( - "Starting worker | pid=%s | host=%s | queue=%s | url=%s", - os.getpid(), socket.gethostname(), QUEUE_NAME, safe_url, - ) - - connection = await aio_pika.connect_robust(RABBITMQ_URL, loop=loop) - try: - channel = await connection.channel() - queue = await channel.declare_queue(QUEUE_NAME, durable=True) - logger.info("Waiting for messages on queue='%s' …", QUEUE_NAME) - - async with queue.iterator() as queue_iter: - async for message in queue_iter: - async with message.process(requeue=False): - try: - payload = json.loads(message.body.decode("utf-8")) - except Exception as e: - logger.exception("Failed to decode message: %s", e) - continue - try: - await _dispatch(payload) - except Exception: - logger.exception("Unhandled error while processing message: %s", payload) - # message is not requeued due to process(requeue=False) - finally: - await connection.close() - logger.info("Worker stopped") - - -async def main() -> None: - loop = asyncio.get_running_loop() - - stop_event = asyncio.Event() - - def _request_shutdown(signame: str) -> None: - logger.info("Received %s, shutting down gracefully…", signame) - stop_event.set() - - for sig in (signal.SIGINT, signal.SIGTERM): - with suppress(NotImplementedError): - loop.add_signal_handler(sig, _request_shutdown, sig.name) - - consumer = asyncio.create_task(_consume(loop)) - - # Wait until a stop signal arrives - await stop_event.wait() - - # Give the consumer some time to exit - consumer.cancel() - with suppress(asyncio.CancelledError): - await consumer - - -if __name__ == "__main__": - _auto_import_handlers() - try: - asyncio.run(main()) - except KeyboardInterrupt: - pass diff --git a/backend/requirements.txt b/backend/requirements.txt index 045371e..1f8e9bf 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -2,14 +2,20 @@ aio-pika==9.5.6 aiormq==6.8.1 aiosqlite==0.21.0 alembic==1.16.5 +amqp==5.3.1 annotated-types==0.7.0 anyio==4.11.0 argon2-cffi==23.1.0 argon2-cffi-bindings==25.1.0 asyncmy==0.2.9 bcrypt==4.3.0 +billiard==4.2.2 +celery==5.5.3 cffi==2.0.0 click==8.1.8 +click-didyoumean==0.3.1 +click-plugins==1.1.1.2 +click-repl==0.3.0 cryptography==46.0.1 dnspython==2.7.0 email_validator==2.2.0 @@ -21,11 +27,14 @@ greenlet==3.2.4 h11==0.16.0 httptools==0.6.4 idna==3.10 +kombu==5.5.4 makefun==1.16.0 Mako==1.3.10 MarkupSafe==3.0.2 multidict==6.6.4 +packaging==25.0 pamqp==3.3.0 +prompt_toolkit==3.0.52 propcache==0.3.2 pwdlib==0.2.1 pycparser==2.23 @@ -33,17 +42,22 @@ pydantic==2.11.9 pydantic_core==2.33.2 PyJWT==2.10.1 PyMySQL==1.1.2 +python-dateutil==2.9.0.post0 python-dotenv==1.1.1 python-multipart==0.0.20 PyYAML==6.0.2 +six==1.17.0 sniffio==1.3.1 SQLAlchemy==2.0.43 starlette==0.48.0 tomli==2.2.1 typing-inspection==0.4.1 typing_extensions==4.15.0 +tzdata==2025.2 uvicorn==0.37.0 uvloop==0.21.0 +vine==5.1.0 watchfiles==1.1.0 +wcwidth==0.2.14 websockets==15.0.1 yarl==1.20.1 diff --git a/deployment/app-demo-worker-deployment.yaml b/deployment/app-demo-worker-deployment.yaml index 8520408..fc842f7 100644 --- a/deployment/app-demo-worker-deployment.yaml +++ b/deployment/app-demo-worker-deployment.yaml @@ -17,8 +17,14 @@ spec: - image: lukastrkan/cc-app-demo@sha256:d320eefb9dee05dc0f0ec5a2ca90daae7ca8c2af0088dc6b88eee076486c0f3b name: app-demo-worker command: - - python3 -m - - app.workers.queue_worker + - celery + - -A + - app.celery_app + - worker + - -Q + - $(MAIL_QUEUE) + - --loglevel + - INFO env: - name: RABBITMQ_USERNAME value: demo-app From 9a436d3c7059e25fab8252d07a553b2052dcfe9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Trkan?= Date: Thu, 2 Oct 2025 14:32:37 +0200 Subject: [PATCH 8/9] feat(infrastructure): use celery worker --- backend/app/celery_app.py | 61 +++++++++++++++++++++++++++++ backend/app/workers/celery_tasks.py | 44 +++++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 backend/app/celery_app.py create mode 100644 backend/app/workers/celery_tasks.py diff --git a/backend/app/celery_app.py b/backend/app/celery_app.py new file mode 100644 index 0000000..6655068 --- /dev/null +++ b/backend/app/celery_app.py @@ -0,0 +1,61 @@ +import os +from celery import Celery + +# Reuse existing RabbitMQ configuration env vars with vhost and optional TLS support +# If RABBITMQ_URL is provided, it takes precedence. Otherwise compose from parts. +if os.getenv("RABBITMQ_URL"): + RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore +else: + from urllib.parse import quote + + username = os.getenv("RABBITMQ_USERNAME", "user") + password = os.getenv("RABBITMQ_PASSWORD", "bitnami123") + host = os.getenv("RABBITMQ_HOST", "localhost") + port = os.getenv("RABBITMQ_PORT", "5672") + vhost = os.getenv("RABBITMQ_VHOST", "/") + use_ssl = os.getenv("RABBITMQ_USE_SSL", "0").lower() in {"1", "true", "yes"} + scheme = "amqps" if use_ssl else "amqp" + + # Kombu uses '//' to denote the default '/' vhost. For custom vhosts, URL-encode them. + if vhost in ("/", ""): + vhost_path = "/" # will become '//' after concatenation below + else: + vhost_path = f"/{quote(vhost, safe='')}" + + # Ensure we end up with e.g. amqp://user:pass@host:5672// (for '/') + RABBITMQ_URL = f"{scheme}://{username}:{password}@{host}:{port}{vhost_path}" + if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"): + RABBITMQ_URL += "/" + +# Default queue name to keep parity with the previous worker +DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue") + +# Use RPC backend by default to avoid coupling to Redis +CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://") + +celery_app = Celery( + "app", + broker=RABBITMQ_URL, + backend=CELERY_BACKEND, + include=[ + "app.workers.celery_tasks", + ], +) + +# Ensure this Celery app becomes the default for producers (e.g., FastAPI process) +# so that @shared_task.delay(...) uses the same broker/credentials as the worker. +celery_app.set_default() + +# Basic, safe defaults – single prefetch helps fairness similarly to the old worker +celery_app.conf.update( + task_default_queue=DEFAULT_QUEUE, + task_acks_late=True, + worker_prefetch_multiplier=int(os.getenv("CELERY_PREFETCH", "1")), + task_serializer="json", + result_serializer="json", + accept_content=["json"], + broker_heartbeat=0, # let kombu handle heartbeats robustly in some envs +) + +# Expose a shortcut for Celery CLI discovery: `celery -A app.celery_app worker ...` +__all__ = ["celery_app"] diff --git a/backend/app/workers/celery_tasks.py b/backend/app/workers/celery_tasks.py new file mode 100644 index 0000000..92b16b2 --- /dev/null +++ b/backend/app/workers/celery_tasks.py @@ -0,0 +1,44 @@ +import logging +from typing import Any, Dict + +from celery import shared_task + +logger = logging.getLogger("celery_tasks") +if not logger.handlers: + _h = logging.StreamHandler() + logger.addHandler(_h) +logger.setLevel(logging.INFO) + + +@shared_task(name="workers.send_email") +def send_email(payload: Dict[str, Any]) -> None: + """Celery task to send an email. + + Expected payload schema: + { + "type": "email", + "to": "recipient@example.com", + "subject": "Subject text", + "body": "Email body text" + } + + This mirrors the previous queue worker contract to minimize changes upstream. + """ + to = payload.get("to") + subject = payload.get("subject") + body = payload.get("body") + if not (to and subject and body): + logger.error("Email task missing fields. Payload: %s", payload) + return + + # Placeholder for real email sending logic + # Implement SMTP provider call here + logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body)) + + +@shared_task(name="workers.send_email_fields") +def send_email_fields(to: str, subject: str, body: str) -> None: + """Alternate signature for convenience when calling from app code. + Routes to the same implementation as send_email. + """ + send_email({"type": "email", "to": to, "subject": subject, "body": body}) From 145565b542795092793605a5891ee3c524be6a93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Trkan?= Date: Thu, 2 Oct 2025 14:59:44 +0200 Subject: [PATCH 9/9] feat(infrastructure): use celery worker --- backend/app/celery_app.py | 15 ++------------- backend/app/core/queue.py | 26 ++------------------------ backend/app/workers/celery_tasks.py | 29 ++--------------------------- 3 files changed, 6 insertions(+), 64 deletions(-) diff --git a/backend/app/celery_app.py b/backend/app/celery_app.py index 6655068..574964d 100644 --- a/backend/app/celery_app.py +++ b/backend/app/celery_app.py @@ -1,8 +1,6 @@ import os from celery import Celery -# Reuse existing RabbitMQ configuration env vars with vhost and optional TLS support -# If RABBITMQ_URL is provided, it takes precedence. Otherwise compose from parts. if os.getenv("RABBITMQ_URL"): RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore else: @@ -27,26 +25,19 @@ else: if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"): RABBITMQ_URL += "/" -# Default queue name to keep parity with the previous worker DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue") -# Use RPC backend by default to avoid coupling to Redis CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://") celery_app = Celery( "app", broker=RABBITMQ_URL, - backend=CELERY_BACKEND, - include=[ - "app.workers.celery_tasks", - ], + # backend=CELERY_BACKEND, ) +celery_app.autodiscover_tasks(["app.workers"], related_name="celery_tasks") # discover app.workers.celery_tasks -# Ensure this Celery app becomes the default for producers (e.g., FastAPI process) -# so that @shared_task.delay(...) uses the same broker/credentials as the worker. celery_app.set_default() -# Basic, safe defaults – single prefetch helps fairness similarly to the old worker celery_app.conf.update( task_default_queue=DEFAULT_QUEUE, task_acks_late=True, @@ -54,8 +45,6 @@ celery_app.conf.update( task_serializer="json", result_serializer="json", accept_content=["json"], - broker_heartbeat=0, # let kombu handle heartbeats robustly in some envs ) -# Expose a shortcut for Celery CLI discovery: `celery -A app.celery_app worker ...` __all__ = ["celery_app"] diff --git a/backend/app/core/queue.py b/backend/app/core/queue.py index f899a1a..1fd564e 100644 --- a/backend/app/core/queue.py +++ b/backend/app/core/queue.py @@ -1,28 +1,6 @@ -# Route email jobs via Celery instead of raw RabbitMQ publishing -# Ensure Celery app is initialized so producers use correct broker credentials. -# Import has side effects (sets Celery default app), safe to keep at module level. import app.celery_app # noqa: F401 - -# Use a lazy proxy so _send_email_task is never None, even if Celery modules -# are not yet importable at import time (e.g., different working dir during startup). -class _LazySendTask: - def delay(self, to: str, subject: str, body: str): # type: ignore[no-untyped-def] - # Late import on first use - from app.workers.celery_tasks import send_email_fields as _send # type: ignore - return _send.delay(to, subject, body) - - -try: - # Try eager import for normal operation - from app.workers.celery_tasks import send_email_fields as _send_email_task # type: ignore -except ImportError: # pragma: no cover - fallback lazy import proxy - _send_email_task = _LazySendTask() # type: ignore +from app.workers.celery_tasks import send_email def enqueue_email(to: str, subject: str, body: str) -> None: - """Enqueue an email job using Celery. - - Keeps the same public API as before. - """ - _send_email_task.delay(to, subject, body) - + send_email.delay(to, subject, body) diff --git a/backend/app/workers/celery_tasks.py b/backend/app/workers/celery_tasks.py index 92b16b2..861004c 100644 --- a/backend/app/workers/celery_tasks.py +++ b/backend/app/workers/celery_tasks.py @@ -1,5 +1,4 @@ import logging -from typing import Any, Dict from celery import shared_task @@ -11,34 +10,10 @@ logger.setLevel(logging.INFO) @shared_task(name="workers.send_email") -def send_email(payload: Dict[str, Any]) -> None: - """Celery task to send an email. - - Expected payload schema: - { - "type": "email", - "to": "recipient@example.com", - "subject": "Subject text", - "body": "Email body text" - } - - This mirrors the previous queue worker contract to minimize changes upstream. - """ - to = payload.get("to") - subject = payload.get("subject") - body = payload.get("body") +def send_email(to: str, subject: str, body: str) -> None: if not (to and subject and body): - logger.error("Email task missing fields. Payload: %s", payload) + logger.error("Email task missing fields. to=%r subject=%r body_len=%r", to, subject, len(body) if body else 0) return # Placeholder for real email sending logic - # Implement SMTP provider call here logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body)) - - -@shared_task(name="workers.send_email_fields") -def send_email_fields(to: str, subject: str, body: str) -> None: - """Alternate signature for convenience when calling from app code. - Routes to the same implementation as send_email. - """ - send_email({"type": "email", "to": to, "subject": subject, "body": body})