Compare commits

17 Commits

Author SHA1 Message Date
145565b542 feat(infrastructure): use celery worker 2025-10-02 14:59:44 +02:00
9a436d3c70 feat(infrastructure): use celery worker 2025-10-02 14:32:37 +02:00
49efd88f29 feat(infrastructure): use celery worker 2025-10-02 14:32:07 +02:00
3e809782a6 feat(infrastructure): update queue worker 2025-10-02 13:59:01 +02:00
7cd96c830d feat(infrastructure): update queue worker 2025-10-02 13:08:59 +02:00
233a331cba Update backend/app/workers/queue_worker.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-10-02 13:04:37 +02:00
a0bc94d7ec Update backend/app/workers/queue_worker.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-10-02 13:04:30 +02:00
e31ec199c0 Update backend/app/workers/queue_worker.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-10-02 13:04:20 +02:00
6d8b760a7d feat(infrastructure): update queue worker 2025-10-02 13:00:57 +02:00
github-actions[bot]
42f3d4dae1 Merge pull request #1 from dat515-2025/merge/refactor_migrations
Some checks failed
Build, Push and Update Image in Manifest / build-and-update (push) Has been cancelled
refactor(backend): refactor project, add database migrations support
2025-09-30 11:08:31 +00:00
57d8f169da Merge pull request #1 from dat515-2025/merge/refactor_migrations
refactor(backend): refactor project, add database migrations support
2025-09-30 13:07:41 +02:00
8a718d6f59 move design 2025-09-29 12:08:45 +02:00
4af6d34507 refactor(backend): remove circular dependency 2025-09-24 20:25:00 +02:00
615803de2d refactor(backend): solve copilot comments 2025-09-24 20:15:25 +02:00
106497e791 refactor(backend): solve copilot comments 2025-09-24 20:12:45 +02:00
f4892a69d5 refactor(backend): solve copilot comments 2025-09-24 20:10:31 +02:00
3c8ad5f74f refactor(backend): refactor project, add database migrations support 2025-09-24 19:42:04 +02:00
32 changed files with 470 additions and 174 deletions

View File

@@ -4,4 +4,4 @@ COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
COPY . . COPY . .
EXPOSE 8000 EXPOSE 8000
CMD ["uvicorn", "app.app:app", "--host", "0.0.0.0", "--port", "8000"] CMD alembic upgrade head && uvicorn app.app:app --host 0.0.0.0 --port 8000

148
backend/alembic.ini Normal file
View File

@@ -0,0 +1,148 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
# sqlalchemy.url = driver://user:pass@localhost/dbname
# Pro async MariaDB bude url brána z proměnné prostředí DATABASE_URL
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

55
backend/alembic/env.py Normal file
View File

@@ -0,0 +1,55 @@
import os
import sys
from logging.config import fileConfig
from sqlalchemy import pool, create_engine
from alembic import context
# Add path for correct loading of modules
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from app.core.db import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
mariadb_host = os.getenv("MARIADB_HOST", "localhost")
mariadb_port = os.getenv("MARIADB_PORT", "3306")
mariadb_db = os.getenv("MARIADB_DB", "group_project")
mariadb_user = os.getenv("MARIADB_USER", "root")
mariadb_password = os.getenv("MARIADB_PASSWORD", "strongpassword")
DATABASE_URL = f"mysql+pymysql://{mariadb_user}:{mariadb_password}@{mariadb_host}:{mariadb_port}/{mariadb_db}"
# Use synchronous driver for Alembic migrations
SYNC_DATABASE_URL = DATABASE_URL.replace("+asyncmy", "+pymysql")
def run_migrations_offline() -> None:
context.configure(
url=SYNC_DATABASE_URL,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
connectable = create_engine(SYNC_DATABASE_URL, poolclass=pool.NullPool)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,52 @@
"""Init migration
Revision ID: 81f275275556
Revises:
Create Date: 2025-09-24 17:39:25.346690
"""
from typing import Sequence, Union
import fastapi_users_db_sqlalchemy
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '81f275275556'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('transaction',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('amount', sa.Float(), nullable=False),
sa.Column('description', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('user',
sa.Column('first_name', sa.String(length=100), nullable=True),
sa.Column('last_name', sa.String(length=100), nullable=True),
sa.Column('id', fastapi_users_db_sqlalchemy.generics.GUID(), nullable=False),
sa.Column('email', sa.String(length=320), nullable=False),
sa.Column('hashed_password', sa.String(length=1024), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=False),
sa.Column('is_superuser', sa.Boolean(), nullable=False),
sa.Column('is_verified', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_user_email'), table_name='user')
op.drop_table('user')
op.drop_table('transaction')
# ### end Alembic commands ###

View File

View File

@@ -1,9 +1,10 @@
from fastapi import Depends, FastAPI from fastapi import Depends, FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from .db import User, create_db_and_tables from app.models.user import User
from .schemas import UserCreate, UserRead, UserUpdate
from .users import auth_backend, current_active_verified_user, fastapi_users from app.schemas.user import UserCreate, UserRead, UserUpdate
from app.services.user_service import auth_backend, current_active_verified_user, fastapi_users
app = FastAPI() app = FastAPI()
@@ -53,9 +54,3 @@ async def root():
@app.get("/authenticated-route") @app.get("/authenticated-route")
async def authenticated_route(user: User = Depends(current_active_verified_user)): async def authenticated_route(user: User = Depends(current_active_verified_user)):
return {"message": f"Hello {user.email}!"} return {"message": f"Hello {user.email}!"}
@app.on_event("startup")
async def on_startup():
# Not needed if you setup a migration system like Alembic
await create_db_and_tables()

50
backend/app/celery_app.py Normal file
View File

@@ -0,0 +1,50 @@
import os
from celery import Celery
if os.getenv("RABBITMQ_URL"):
RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore
else:
from urllib.parse import quote
username = os.getenv("RABBITMQ_USERNAME", "user")
password = os.getenv("RABBITMQ_PASSWORD", "bitnami123")
host = os.getenv("RABBITMQ_HOST", "localhost")
port = os.getenv("RABBITMQ_PORT", "5672")
vhost = os.getenv("RABBITMQ_VHOST", "/")
use_ssl = os.getenv("RABBITMQ_USE_SSL", "0").lower() in {"1", "true", "yes"}
scheme = "amqps" if use_ssl else "amqp"
# Kombu uses '//' to denote the default '/' vhost. For custom vhosts, URL-encode them.
if vhost in ("/", ""):
vhost_path = "/" # will become '//' after concatenation below
else:
vhost_path = f"/{quote(vhost, safe='')}"
# Ensure we end up with e.g. amqp://user:pass@host:5672// (for '/')
RABBITMQ_URL = f"{scheme}://{username}:{password}@{host}:{port}{vhost_path}"
if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"):
RABBITMQ_URL += "/"
DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue")
CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://")
celery_app = Celery(
"app",
broker=RABBITMQ_URL,
# backend=CELERY_BACKEND,
)
celery_app.autodiscover_tasks(["app.workers"], related_name="celery_tasks") # discover app.workers.celery_tasks
celery_app.set_default()
celery_app.conf.update(
task_default_queue=DEFAULT_QUEUE,
task_acks_late=True,
worker_prefetch_multiplier=int(os.getenv("CELERY_PREFETCH", "1")),
task_serializer="json",
result_serializer="json",
accept_content=["json"],
)
__all__ = ["celery_app"]

View File

4
backend/app/core/base.py Normal file
View File

@@ -0,0 +1,4 @@
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
Base: DeclarativeMeta = declarative_base()

View File

@@ -1,10 +1,6 @@
import os import os
from typing import AsyncGenerator from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from app.core.base import Base
from fastapi import Depends
from fastapi_users.db import SQLAlchemyBaseUserTableUUID, SQLAlchemyUserDatabase
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
DATABASE_URL = os.getenv("DATABASE_URL") DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL: if not DATABASE_URL:
@@ -13,21 +9,15 @@ if not DATABASE_URL:
mariadb_db = os.getenv("MARIADB_DB", "group_project") mariadb_db = os.getenv("MARIADB_DB", "group_project")
mariadb_user = os.getenv("MARIADB_USER", "root") mariadb_user = os.getenv("MARIADB_USER", "root")
mariadb_password = os.getenv("MARIADB_PASSWORD", "strongpassword") mariadb_password = os.getenv("MARIADB_PASSWORD", "strongpassword")
#always use SSL except for localhost - i dont want to include certs
if mariadb_host and mariadb_db and mariadb_user and mariadb_password: if mariadb_host and mariadb_db and mariadb_user and mariadb_password:
# Use MariaDB/MySQL over async driver
DATABASE_URL = f"mysql+asyncmy://{mariadb_user}:{mariadb_password}@{mariadb_host}:{mariadb_port}/{mariadb_db}" DATABASE_URL = f"mysql+asyncmy://{mariadb_user}:{mariadb_password}@{mariadb_host}:{mariadb_port}/{mariadb_db}"
else: else:
raise Exception("Only MariaDB is supported. Please set the DATABASE_URL environment variable.") raise Exception("Only MariaDB is supported. Please set the DATABASE_URL environment variable.")
Base: DeclarativeMeta = declarative_base() # Load all models to register them
from app.models.user import User
from app.models.transaction import Transaction
class User(SQLAlchemyBaseUserTableUUID, Base):
pass
# Nastavení connect_args pro SSL pouze pokud není localhost
ssl_enabled = os.getenv("MARIADB_HOST", "localhost") != "localhost" ssl_enabled = os.getenv("MARIADB_HOST", "localhost") != "localhost"
connect_args = {"ssl": {"ssl": True}} if ssl_enabled else {} connect_args = {"ssl": {"ssl": True}} if ssl_enabled else {}
@@ -38,17 +28,3 @@ engine = create_async_engine(
connect_args=connect_args, connect_args=connect_args,
) )
async_session_maker = async_sessionmaker(engine, expire_on_commit=False) async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
async def create_db_and_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User)

View File

@@ -0,0 +1,6 @@
import app.celery_app # noqa: F401
from app.workers.celery_tasks import send_email
def enqueue_email(to: str, subject: str, body: str) -> None:
send_email.delay(to, subject, body)

View File

View File

@@ -0,0 +1,9 @@
from sqlalchemy import Column, Integer, String, Float
from app.core.base import Base
class Transaction(Base):
__tablename__ = "transaction"
id = Column(Integer, primary_key=True, autoincrement=True)
amount = Column(Float, nullable=False)
description = Column(String(length=255), nullable=True)

View File

@@ -0,0 +1,7 @@
from sqlalchemy import Column, String
from fastapi_users.db import SQLAlchemyBaseUserTableUUID
from app.core.base import Base
class User(SQLAlchemyBaseUserTableUUID, Base):
first_name = Column(String(length=100), nullable=True)
last_name = Column(String(length=100), nullable=True)

View File

@@ -1,47 +0,0 @@
import json
import os
from typing import Any, Dict
import asyncio
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
f"{os.getenv('RABBITMQ_PORT', '5672')}"
)
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
async def _publish_async(message: Dict[str, Any]) -> None:
# Import locally to avoid hard dependency at import-time
import aio_pika
connection = await aio_pika.connect_robust(RABBITMQ_URL)
try:
channel = await connection.channel()
await channel.declare_queue(QUEUE_NAME, durable=True)
body = json.dumps(message).encode("utf-8")
await channel.default_exchange.publish(
aio_pika.Message(body=body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
routing_key=QUEUE_NAME,
)
finally:
await connection.close()
def enqueue_email(to: str, subject: str, body: str) -> None:
"""
Enqueue an email to RabbitMQ. If RabbitMQ or aio_pika is not available,
this function will raise ImportError/ConnectionError. The caller may
implement fallback (e.g., direct send).
"""
message = {"type": "email", "to": to, "subject": subject, "body": body}
try:
loop = asyncio.get_running_loop()
# Fire-and-forget task so we don't block the request path
loop.create_task(_publish_async(message))
except RuntimeError:
# No running loop (e.g., called from sync context) run a short loop
asyncio.run(_publish_async(message))

View File

@@ -1,15 +0,0 @@
import uuid
from fastapi_users import schemas
class UserRead(schemas.BaseUser[uuid.UUID]):
pass
class UserCreate(schemas.BaseUserCreate):
pass
class UserUpdate(schemas.BaseUserUpdate):
pass

View File

View File

@@ -0,0 +1,16 @@
import uuid
from typing import Optional
from fastapi_users import schemas
class UserRead(schemas.BaseUser[uuid.UUID]):
first_name: Optional[str] = None
surname: Optional[str] = None
class UserCreate(schemas.BaseUserCreate):
first_name: Optional[str] = None
surname: Optional[str] = None
class UserUpdate(schemas.BaseUserUpdate):
first_name: Optional[str] = None
surname: Optional[str] = None

View File

View File

@@ -0,0 +1,14 @@
from typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi_users.db import SQLAlchemyUserDatabase
from ..core.db import async_session_maker
from ..models.user import User
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User)

View File

@@ -7,23 +7,23 @@ from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin
from fastapi_users.authentication import ( from fastapi_users.authentication import (
AuthenticationBackend, AuthenticationBackend,
BearerTransport, BearerTransport,
JWTStrategy,
) )
from fastapi_users.authentication.strategy.jwt import JWTStrategy
from fastapi_users.db import SQLAlchemyUserDatabase from fastapi_users.db import SQLAlchemyUserDatabase
from .db import User, get_user_db from app.models.user import User
from app.services.db import get_user_db
from app.core.queue import enqueue_email
SECRET = os.getenv("SECRET", "CHANGE_ME_SECRET") SECRET = os.getenv("SECRET", "CHANGE_ME_SECRET")
FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:5173") FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:5173")
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000") BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000")
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]): class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
reset_password_token_secret = SECRET reset_password_token_secret = SECRET
verification_token_secret = SECRET verification_token_secret = SECRET
async def on_after_register(self, user: User, request: Optional[Request] = None): async def on_after_register(self, user: User, request: Optional[Request] = None):
# Ask FastAPI Users to generate a verification token and trigger the hook below
await self.request_verify(user, request) await self.request_verify(user, request)
async def on_after_forgot_password( async def on_after_forgot_password(
@@ -34,7 +34,6 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
async def on_after_request_verify( async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None self, user: User, token: str, request: Optional[Request] = None
): ):
# Build verification email and send through RabbitMQ (with direct SMTP fallback)
verify_frontend_link = f"{FRONTEND_URL}/verify?token={token}" verify_frontend_link = f"{FRONTEND_URL}/verify?token={token}"
verify_backend_link = f"{BACKEND_URL}/auth/verify?token={token}" verify_backend_link = f"{BACKEND_URL}/auth/verify?token={token}"
subject = "Ověření účtu" subject = "Ověření účtu"
@@ -47,26 +46,20 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
"Pokud jsi registraci neprováděl(a), tento email ignoruj.\n" "Pokud jsi registraci neprováděl(a), tento email ignoruj.\n"
) )
try: try:
from .queue import enqueue_email
enqueue_email(to=user.email, subject=subject, body=body) enqueue_email(to=user.email, subject=subject, body=body)
except Exception: except Exception as e:
# Fallback: if queue is unavailable, log the email content (dev fallback)
print("[Email Fallback] To:", user.email) print("[Email Fallback] To:", user.email)
print("[Email Fallback] Subject:", subject) print("[Email Fallback] Subject:", subject)
print("[Email Fallback] Body:\n", body) print("[Email Fallback] Body:\n", body)
async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)): async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)):
yield UserManager(user_db) yield UserManager(user_db)
bearer_transport = BearerTransport(tokenUrl="auth/jwt/login") bearer_transport = BearerTransport(tokenUrl="auth/jwt/login")
def get_jwt_strategy() -> JWTStrategy: def get_jwt_strategy() -> JWTStrategy:
return JWTStrategy(secret=SECRET, lifetime_seconds=3600) return JWTStrategy(secret=SECRET, lifetime_seconds=3600)
auth_backend = AuthenticationBackend( auth_backend = AuthenticationBackend(
name="jwt", name="jwt",
transport=bearer_transport, transport=bearer_transport,
@@ -77,3 +70,4 @@ fastapi_users = FastAPIUsers[User, uuid.UUID](get_user_manager, [auth_backend])
current_active_user = fastapi_users.current_user(active=True) current_active_user = fastapi_users.current_user(active=True)
current_active_verified_user = fastapi_users.current_user(active=True, verified=True) current_active_verified_user = fastapi_users.current_user(active=True, verified=True)

View File

View File

@@ -0,0 +1,19 @@
import logging
from celery import shared_task
logger = logging.getLogger("celery_tasks")
if not logger.handlers:
_h = logging.StreamHandler()
logger.addHandler(_h)
logger.setLevel(logging.INFO)
@shared_task(name="workers.send_email")
def send_email(to: str, subject: str, body: str) -> None:
if not (to and subject and body):
logger.error("Email task missing fields. to=%r subject=%r body_len=%r", to, subject, len(body) if body else 0)
return
# Placeholder for real email sending logic
logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body))

View File

@@ -1,14 +1,21 @@
aio-pika==9.5.6 aio-pika==9.5.6
aiormq==6.8.1 aiormq==6.8.1
aiosqlite==0.21.0 aiosqlite==0.21.0
alembic==1.16.5
amqp==5.3.1
annotated-types==0.7.0 annotated-types==0.7.0
anyio==4.11.0 anyio==4.11.0
argon2-cffi==23.1.0 argon2-cffi==23.1.0
argon2-cffi-bindings==25.1.0 argon2-cffi-bindings==25.1.0
asyncmy==0.2.9 asyncmy==0.2.9
bcrypt==4.3.0 bcrypt==4.3.0
billiard==4.2.2
celery==5.5.3
cffi==2.0.0 cffi==2.0.0
click==8.1.8 click==8.1.8
click-didyoumean==0.3.1
click-plugins==1.1.1.2
click-repl==0.3.0
cryptography==46.0.1 cryptography==46.0.1
dnspython==2.7.0 dnspython==2.7.0
email_validator==2.2.0 email_validator==2.2.0
@@ -20,25 +27,37 @@ greenlet==3.2.4
h11==0.16.0 h11==0.16.0
httptools==0.6.4 httptools==0.6.4
idna==3.10 idna==3.10
kombu==5.5.4
makefun==1.16.0 makefun==1.16.0
Mako==1.3.10
MarkupSafe==3.0.2
multidict==6.6.4 multidict==6.6.4
packaging==25.0
pamqp==3.3.0 pamqp==3.3.0
prompt_toolkit==3.0.52
propcache==0.3.2 propcache==0.3.2
pwdlib==0.2.1 pwdlib==0.2.1
pycparser==2.23 pycparser==2.23
pydantic==2.11.9 pydantic==2.11.9
pydantic_core==2.33.2 pydantic_core==2.33.2
PyJWT==2.10.1 PyJWT==2.10.1
PyMySQL==1.1.2
python-dateutil==2.9.0.post0
python-dotenv==1.1.1 python-dotenv==1.1.1
python-multipart==0.0.20 python-multipart==0.0.20
PyYAML==6.0.2 PyYAML==6.0.2
six==1.17.0
sniffio==1.3.1 sniffio==1.3.1
SQLAlchemy==2.0.43 SQLAlchemy==2.0.43
starlette==0.48.0 starlette==0.48.0
tomli==2.2.1
typing-inspection==0.4.1 typing-inspection==0.4.1
typing_extensions==4.15.0 typing_extensions==4.15.0
tzdata==2025.2
uvicorn==0.37.0 uvicorn==0.37.0
uvloop==0.21.0 uvloop==0.21.0
vine==5.1.0
watchfiles==1.1.0 watchfiles==1.1.0
wcwidth==0.2.14
websockets==15.0.1 websockets==15.0.1
yarl==1.20.1 yarl==1.20.1

View File

@@ -1,57 +0,0 @@
import asyncio
import json
import os
from typing import Any, Dict
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"
f"{os.getenv('RABBITMQ_HOST', 'localhost')}:"
f"{os.getenv('RABBITMQ_PORT', '5672')}"
)
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
async def handle_message(message_body: bytes) -> None:
try:
data: Dict[str, Any] = json.loads(message_body.decode("utf-8"))
except Exception as e:
print(f"[email_worker] Failed to decode message: {e}")
return
if data.get("type") != "email":
print(f"[email_worker] Unknown message type: {data}")
return
to = data.get("to")
subject = data.get("subject")
body = data.get("body")
if not (to and subject and body):
print(f"[email_worker] Incomplete email message: {data}")
return
try:
await send_email(to=to, subject=subject, body=body)
print(f"[email_worker] Sent email to {to}")
except Exception as e:
print(f"[email_worker] Error sending email to {to}: {e}")
async def main() -> None:
import aio_pika
print(f"[email_worker] Connecting to RabbitMQ at {RABBITMQ_URL}")
connection = await aio_pika.connect_robust(RABBITMQ_URL)
channel = await connection.channel()
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
print(f"[email_worker] Waiting for messages in queue '{QUEUE_NAME}' ...")
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process(requeue=False):
await handle_message(message.body)
if __name__ == "__main__":
asyncio.run(main())

11
create_migration.sh Normal file
View File

@@ -0,0 +1,11 @@
#!/bin/bash
if [ -z "$1" ]; then
echo "Usage: $0 <migration_message>"
exit 1
fi
cd backend || { echo "Directory 'backend' does not exist"; exit 1; }
alembic revision --autogenerate -m "$1"
git add alembic/versions/*
cd - || exit

View File

@@ -14,7 +14,7 @@ spec:
app: app-demo app: app-demo
spec: spec:
containers: containers:
- image: lukastrkan/cc-app-demo@sha256:84cbe8181c87c32579b00e44d9c15e2db6d4a5c1e73577e517832b76bf337c59 - image: lukastrkan/cc-app-demo@sha256:d320eefb9dee05dc0f0ec5a2ca90daae7ca8c2af0088dc6b88eee076486c0f3b
name: app-demo name: app-demo
ports: ports:
- containerPort: 8000 - containerPort: 8000

View File

@@ -14,11 +14,17 @@ spec:
app: app-demo-worker app: app-demo-worker
spec: spec:
containers: containers:
- image: lukastrkan/cc-app-demo@sha256:84cbe8181c87c32579b00e44d9c15e2db6d4a5c1e73577e517832b76bf337c59 - image: lukastrkan/cc-app-demo@sha256:d320eefb9dee05dc0f0ec5a2ca90daae7ca8c2af0088dc6b88eee076486c0f3b
name: app-demo-worker name: app-demo-worker
command: command:
- python3 - celery
- worker/email_worker.py - -A
- app.celery_app
- worker
- -Q
- $(MAIL_QUEUE)
- --loglevel
- INFO
env: env:
- name: RABBITMQ_USERNAME - name: RABBITMQ_USERNAME
value: demo-app value: demo-app

6
upgrade_database.sh Normal file
View File

@@ -0,0 +1,6 @@
#!/bin/bash
# Script to upgrade the database using Alembic
cd backend || { echo "Directory 'backend' does not exist"; exit 1; }
alembic upgrade head