refactor(backend): refactor project, add database migrations support

This commit is contained in:
2025-09-24 19:42:04 +02:00
parent b880670929
commit 3c8ad5f74f
33 changed files with 408 additions and 77 deletions

View File

@@ -4,4 +4,4 @@ COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
COPY . . COPY . .
EXPOSE 8000 EXPOSE 8000
CMD ["uvicorn", "app.app:app", "--host", "0.0.0.0", "--port", "8000"] CMD alembic upgrade head && uvicorn app.app:app --host 0.0.0.0 --port 8000

24
backend/README.md Normal file
View File

@@ -0,0 +1,24 @@
# Backend automatické migrace (Alembic)
## Automatické migrace
Při spuštění backendu v Dockeru se automaticky provedou migrace databáze pomocí Alembic (`alembic upgrade head`).
## Ruční práce s migracemi
- Vytvoření nové migrace podle modelů:
```sh
export DATABASE_URL='mysql+asyncmy://root:strongpassword@localhost:3306/group_project'
alembic revision --autogenerate -m "popis migrace"
```
- Aplikace migrací:
```sh
export DATABASE_URL='mysql+asyncmy://root:strongpassword@localhost:3306/group_project'
alembic upgrade head
```
## Poznámky
- Pro autogeneraci migrací je nutné mít nainstalován balíček `pymysql`.
- Modely pro migrace jsou v `app/db.py`.
- Konfigurace Alembic je v `alembic.ini` a `alembic/env.py`.

148
backend/alembic.ini Normal file
View File

@@ -0,0 +1,148 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
# sqlalchemy.url = driver://user:pass@localhost/dbname
# Pro async MariaDB bude url brána z proměnné prostředí DATABASE_URL
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
backend/alembic/README Normal file
View File

@@ -0,0 +1 @@
Generic single-database configuration.

56
backend/alembic/env.py Normal file
View File

@@ -0,0 +1,56 @@
import os
import sys
from logging.config import fileConfig
from sqlalchemy import pool, create_engine
from alembic import context
# Přidání backend do sys.path pro správný import
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from app.core.db import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
# fallback na synchronní url pro Alembic (používá sync driver)
mariadb_host = os.getenv("MARIADB_HOST", "localhost")
mariadb_port = os.getenv("MARIADB_PORT", "3306")
mariadb_db = os.getenv("MARIADB_DB", "group_project")
mariadb_user = os.getenv("MARIADB_USER", "root")
mariadb_password = os.getenv("MARIADB_PASSWORD", "strongpassword")
DATABASE_URL = f"mysql+pymysql://{mariadb_user}:{mariadb_password}@{mariadb_host}:{mariadb_port}/{mariadb_db}"
# Vždy používej synchronní engine pro migrace
SYNC_DATABASE_URL = DATABASE_URL.replace("+asyncmy", "+pymysql")
def run_migrations_offline() -> None:
context.configure(
url=SYNC_DATABASE_URL,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
connectable = create_engine(SYNC_DATABASE_URL, poolclass=pool.NullPool)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,52 @@
"""Init migration
Revision ID: 81f275275556
Revises:
Create Date: 2025-09-24 17:39:25.346690
"""
from typing import Sequence, Union
import fastapi_users_db_sqlalchemy
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '81f275275556'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('transaction',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('amount', sa.Float(), nullable=False),
sa.Column('description', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('user',
sa.Column('first_name', sa.String(length=100), nullable=True),
sa.Column('last_name', sa.String(length=100), nullable=True),
sa.Column('id', fastapi_users_db_sqlalchemy.generics.GUID(), nullable=False),
sa.Column('email', sa.String(length=320), nullable=False),
sa.Column('hashed_password', sa.String(length=1024), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=False),
sa.Column('is_superuser', sa.Boolean(), nullable=False),
sa.Column('is_verified', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_user_email'), table_name='user')
op.drop_table('user')
op.drop_table('transaction')
# ### end Alembic commands ###

0
backend/app/api/.keep Normal file
View File

View File

View File

@@ -1,9 +1,10 @@
from fastapi import Depends, FastAPI from fastapi import Depends, FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from .db import User, create_db_and_tables from app.models.user import User
from .schemas import UserCreate, UserRead, UserUpdate
from .users import auth_backend, current_active_verified_user, fastapi_users from app.schemas.user import UserCreate, UserRead, UserUpdate
from app.services.user_service import auth_backend, current_active_verified_user, fastapi_users
app = FastAPI() app = FastAPI()
@@ -53,9 +54,3 @@ async def root():
@app.get("/authenticated-route") @app.get("/authenticated-route")
async def authenticated_route(user: User = Depends(current_active_verified_user)): async def authenticated_route(user: User = Depends(current_active_verified_user)):
return {"message": f"Hello {user.email}!"} return {"message": f"Hello {user.email}!"}
@app.on_event("startup")
async def on_startup():
# Not needed if you setup a migration system like Alembic
await create_db_and_tables()

2
backend/app/core/.keep Normal file
View File

@@ -0,0 +1,2 @@
# Konfigurace a utility

View File

4
backend/app/core/base.py Normal file
View File

@@ -0,0 +1,4 @@
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
Base: DeclarativeMeta = declarative_base()

View File

@@ -1,10 +1,6 @@
import os import os
from typing import AsyncGenerator from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from app.core.base import Base # Import Base z nového souboru
from fastapi import Depends
from fastapi_users.db import SQLAlchemyBaseUserTableUUID, SQLAlchemyUserDatabase
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
DATABASE_URL = os.getenv("DATABASE_URL") DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL: if not DATABASE_URL:
@@ -13,21 +9,20 @@ if not DATABASE_URL:
mariadb_db = os.getenv("MARIADB_DB", "group_project") mariadb_db = os.getenv("MARIADB_DB", "group_project")
mariadb_user = os.getenv("MARIADB_USER", "root") mariadb_user = os.getenv("MARIADB_USER", "root")
mariadb_password = os.getenv("MARIADB_PASSWORD", "strongpassword") mariadb_password = os.getenv("MARIADB_PASSWORD", "strongpassword")
#always use SSL except for localhost - i dont want to include certs
if mariadb_host and mariadb_db and mariadb_user and mariadb_password: if mariadb_host and mariadb_db and mariadb_user and mariadb_password:
# Use MariaDB/MySQL over async driver
DATABASE_URL = f"mysql+asyncmy://{mariadb_user}:{mariadb_password}@{mariadb_host}:{mariadb_port}/{mariadb_db}" DATABASE_URL = f"mysql+asyncmy://{mariadb_user}:{mariadb_password}@{mariadb_host}:{mariadb_port}/{mariadb_db}"
else: else:
raise Exception("Only MariaDB is supported. Please set the DATABASE_URL environment variable.") raise Exception("Only MariaDB is supported. Please set the DATABASE_URL environment variable.")
Base: DeclarativeMeta = declarative_base() # Explicitní import všech modelů, aby byly registrovány v Base.metadata
from app.models.user import User
from app.models.transaction import Transaction
from app.models.user import User
class User(SQLAlchemyBaseUserTableUUID, Base): # Pokud máš další modely, importuj je zde:
pass # from app.models.other_model import OtherModel
# Nastavení connect_args pro SSL pouze pokud není localhost
ssl_enabled = os.getenv("MARIADB_HOST", "localhost") != "localhost" ssl_enabled = os.getenv("MARIADB_HOST", "localhost") != "localhost"
connect_args = {"ssl": {"ssl": True}} if ssl_enabled else {} connect_args = {"ssl": {"ssl": True}} if ssl_enabled else {}
@@ -38,17 +33,3 @@ engine = create_async_engine(
connect_args=connect_args, connect_args=connect_args,
) )
async_session_maker = async_sessionmaker(engine, expire_on_commit=False) async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
async def create_db_and_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User)

View File

@@ -1,7 +1,6 @@
import json import json
import os import os
from typing import Any, Dict from typing import Any, Dict
import asyncio import asyncio
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
@@ -12,11 +11,8 @@ RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
) )
QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue") QUEUE_NAME = os.getenv("MAIL_QUEUE", "mail_queue")
async def _publish_async(message: Dict[str, Any]) -> None: async def _publish_async(message: Dict[str, Any]) -> None:
# Import locally to avoid hard dependency at import-time
import aio_pika import aio_pika
connection = await aio_pika.connect_robust(RABBITMQ_URL) connection = await aio_pika.connect_robust(RABBITMQ_URL)
try: try:
channel = await connection.channel() channel = await connection.channel()
@@ -29,19 +25,12 @@ async def _publish_async(message: Dict[str, Any]) -> None:
finally: finally:
await connection.close() await connection.close()
def enqueue_email(to: str, subject: str, body: str) -> None: def enqueue_email(to: str, subject: str, body: str) -> None:
"""
Enqueue an email to RabbitMQ. If RabbitMQ or aio_pika is not available,
this function will raise ImportError/ConnectionError. The caller may
implement fallback (e.g., direct send).
"""
message = {"type": "email", "to": to, "subject": subject, "body": body} message = {"type": "email", "to": to, "subject": subject, "body": body}
try: try:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
# Fire-and-forget task so we don't block the request path
loop.create_task(_publish_async(message)) loop.create_task(_publish_async(message))
except RuntimeError: except RuntimeError:
# No running loop (e.g., called from sync context) run a short loop
asyncio.run(_publish_async(message)) asyncio.run(_publish_async(message))
# ...existing code...

2
backend/app/models/.keep Normal file
View File

@@ -0,0 +1,2 @@
# SQLAlchemy modely

View File

View File

@@ -0,0 +1,9 @@
from sqlalchemy import Column, Integer, String, Float
from ..core.db import Base
class Transaction(Base):
__tablename__ = "transaction"
id = Column(Integer, primary_key=True, autoincrement=True)
amount = Column(Float, nullable=False)
description = Column(String(length=255), nullable=True)

View File

@@ -0,0 +1,7 @@
from sqlalchemy import Column, String
from fastapi_users.db import SQLAlchemyBaseUserTableUUID
from ..core.base import Base # Import Base z base.py
class User(SQLAlchemyBaseUserTableUUID, Base):
first_name = Column(String(length=100), nullable=True)
last_name = Column(String(length=100), nullable=True)

View File

@@ -1,15 +0,0 @@
import uuid
from fastapi_users import schemas
class UserRead(schemas.BaseUser[uuid.UUID]):
pass
class UserCreate(schemas.BaseUserCreate):
pass
class UserUpdate(schemas.BaseUserUpdate):
pass

View File

@@ -0,0 +1,2 @@
# Pydantic schémata

View File

View File

@@ -0,0 +1,16 @@
import uuid
from typing import Optional
from fastapi_users import schemas
class UserRead(schemas.BaseUser[uuid.UUID]):
first_name: Optional[str] = None
surname: Optional[str] = None
class UserCreate(schemas.BaseUserCreate):
first_name: Optional[str] = None
surname: Optional[str] = None
class UserUpdate(schemas.BaseUserUpdate):
first_name: Optional[str] = None
surname: Optional[str] = None

View File

@@ -0,0 +1,2 @@
# Business logika

View File

View File

@@ -0,0 +1,14 @@
from typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi_users.db import SQLAlchemyUserDatabase
from ..core.db import async_session_maker
from ..models.user import User
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User)

View File

@@ -11,19 +11,19 @@ from fastapi_users.authentication import (
) )
from fastapi_users.db import SQLAlchemyUserDatabase from fastapi_users.db import SQLAlchemyUserDatabase
from .db import User, get_user_db from ..models.user import User
from ..services.db import get_user_db
from ..core.queue import enqueue_email
SECRET = os.getenv("SECRET", "CHANGE_ME_SECRET") SECRET = os.getenv("SECRET", "CHANGE_ME_SECRET")
FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:5173") FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:5173")
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000") BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000")
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]): class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
reset_password_token_secret = SECRET reset_password_token_secret = SECRET
verification_token_secret = SECRET verification_token_secret = SECRET
async def on_after_register(self, user: User, request: Optional[Request] = None): async def on_after_register(self, user: User, request: Optional[Request] = None):
# Ask FastAPI Users to generate a verification token and trigger the hook below
await self.request_verify(user, request) await self.request_verify(user, request)
async def on_after_forgot_password( async def on_after_forgot_password(
@@ -34,7 +34,6 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
async def on_after_request_verify( async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None self, user: User, token: str, request: Optional[Request] = None
): ):
# Build verification email and send through RabbitMQ (with direct SMTP fallback)
verify_frontend_link = f"{FRONTEND_URL}/verify?token={token}" verify_frontend_link = f"{FRONTEND_URL}/verify?token={token}"
verify_backend_link = f"{BACKEND_URL}/auth/verify?token={token}" verify_backend_link = f"{BACKEND_URL}/auth/verify?token={token}"
subject = "Ověření účtu" subject = "Ověření účtu"
@@ -47,26 +46,20 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
"Pokud jsi registraci neprováděl(a), tento email ignoruj.\n" "Pokud jsi registraci neprováděl(a), tento email ignoruj.\n"
) )
try: try:
from .queue import enqueue_email
enqueue_email(to=user.email, subject=subject, body=body) enqueue_email(to=user.email, subject=subject, body=body)
except Exception: except Exception:
# Fallback: if queue is unavailable, log the email content (dev fallback)
print("[Email Fallback] To:", user.email) print("[Email Fallback] To:", user.email)
print("[Email Fallback] Subject:", subject) print("[Email Fallback] Subject:", subject)
print("[Email Fallback] Body:\n", body) print("[Email Fallback] Body:\n", body)
async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)): async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)):
yield UserManager(user_db) yield UserManager(user_db)
bearer_transport = BearerTransport(tokenUrl="auth/jwt/login") bearer_transport = BearerTransport(tokenUrl="auth/jwt/login")
def get_jwt_strategy() -> JWTStrategy: def get_jwt_strategy() -> JWTStrategy:
return JWTStrategy(secret=SECRET, lifetime_seconds=3600) return JWTStrategy(secret=SECRET, lifetime_seconds=3600)
auth_backend = AuthenticationBackend( auth_backend = AuthenticationBackend(
name="jwt", name="jwt",
transport=bearer_transport, transport=bearer_transport,
@@ -77,3 +70,4 @@ fastapi_users = FastAPIUsers[User, uuid.UUID](get_user_manager, [auth_backend])
current_active_user = fastapi_users.current_user(active=True) current_active_user = fastapi_users.current_user(active=True)
current_active_verified_user = fastapi_users.current_user(active=True, verified=True) current_active_verified_user = fastapi_users.current_user(active=True, verified=True)

View File

@@ -0,0 +1,2 @@
# Background worker

View File

View File

@@ -1,11 +1,12 @@
aio-pika==9.5.6 aio-pika==9.5.6
aiormq==6.8.1 aiormq==6.8.1
aiosqlite==0.21.0 aiosqlite==0.21.0
alembic
annotated-types==0.7.0 annotated-types==0.7.0
anyio==4.11.0 anyio==4.11.0
argon2-cffi==23.1.0 argon2-cffi==23.1.0
argon2-cffi-bindings==25.1.0 argon2-cffi-bindings==25.1.0
asyncmy==0.2.9 asyncmy
bcrypt==4.3.0 bcrypt==4.3.0
cffi==2.0.0 cffi==2.0.0
click==8.1.8 click==8.1.8
@@ -42,3 +43,4 @@ uvloop==0.21.0
watchfiles==1.1.0 watchfiles==1.1.0
websockets==15.0.1 websockets==15.0.1
yarl==1.20.1 yarl==1.20.1
pymysql

View File

@@ -3,7 +3,6 @@ import json
import os import os
from typing import Any, Dict from typing import Any, Dict
RABBITMQ_URL = os.getenv("RABBITMQ_URL") or ( RABBITMQ_URL = os.getenv("RABBITMQ_URL") or (
f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:" f"amqp://{os.getenv('RABBITMQ_USERNAME', 'user')}:"
f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@" f"{os.getenv('RABBITMQ_PASSWORD', 'bitnami123')}@"

11
create_migration.sh Normal file
View File

@@ -0,0 +1,11 @@
#!/bin/bash
if [ -z "$1" ]; then
echo "Usage: $0 <migration_message>"
exit 1
fi
cd backend || { echo "Directory 'backend' does not exist"; exit 1; }
alembic revision --autogenerate -m "$1"
git add alembic/versions/*
cd - || exit

6
upgrade_database.sh Normal file
View File

@@ -0,0 +1,6 @@
#!/bin/bash
# Script to upgrade the database using Alembic
cd backend || { echo "Directory 'backend' does not exist"; exit 1; }
alembic upgrade head