feat(docs): codebase refactor - added src directory

This commit is contained in:
ribardej
2025-11-13 13:45:41 +01:00
parent 6effb2793a
commit f0b1452e30
157 changed files with 16 additions and 6081 deletions

View File

@@ -0,0 +1,8 @@
FROM python:3.11-trixie
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD alembic upgrade head && uvicorn app.app:fastApi --host 0.0.0.0 --port 8000

View File

@@ -0,0 +1,148 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
# sqlalchemy.url = driver://user:pass@localhost/dbname
# Pro async MariaDB bude url brána z proměnné prostředí DATABASE_URL
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -0,0 +1,57 @@
import os
import sys
from logging.config import fileConfig
from sqlalchemy import pool, create_engine
from alembic import context
# Add path for correct loading of modules
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from app.core.db import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
mariadb_host = os.getenv("MARIADB_HOST", "localhost")
mariadb_port = os.getenv("MARIADB_PORT", "3306")
mariadb_db = os.getenv("MARIADB_DB", "group_project")
mariadb_user = os.getenv("MARIADB_USER", "root")
mariadb_password = os.getenv("MARIADB_PASSWORD", "strongpassword")
DATABASE_URL = f"mysql+pymysql://{mariadb_user}:{mariadb_password}@{mariadb_host}:{mariadb_port}/{mariadb_db}"
SYNC_DATABASE_URL = DATABASE_URL.replace("+asyncmy", "+pymysql")
host_env = os.getenv("MARIADB_HOST", "localhost")
ssl_enabled = host_env not in {"localhost", "127.0.0.1"}
connect_args = {"ssl": {"ssl": True}} if ssl_enabled else {}
def run_migrations_offline() -> None:
context.configure(
url=SYNC_DATABASE_URL,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
connectable = create_engine(SYNC_DATABASE_URL, poolclass=pool.NullPool, connect_args=connect_args)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,71 @@
"""add categories
Revision ID: 63e072f09836
Revises:
Create Date: 2025-10-09 14:56:14.653249
"""
from typing import Sequence, Union
import fastapi_users_db_sqlalchemy
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '63e072f09836'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('user',
sa.Column('first_name', sa.String(length=100), nullable=True),
sa.Column('last_name', sa.String(length=100), nullable=True),
sa.Column('id', fastapi_users_db_sqlalchemy.generics.GUID(), nullable=False),
sa.Column('email', sa.String(length=320), nullable=False),
sa.Column('hashed_password', sa.String(length=1024), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=False),
sa.Column('is_superuser', sa.Boolean(), nullable=False),
sa.Column('is_verified', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
op.create_table('categories',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('user_id', fastapi_users_db_sqlalchemy.generics.GUID(), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name')
)
op.create_table('transaction',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('amount', sa.Float(), nullable=False),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('user_id', fastapi_users_db_sqlalchemy.generics.GUID(), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('category_transaction',
sa.Column('id_category', sa.Integer(), nullable=True),
sa.Column('id_transaction', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['id_category'], ['categories.id'], ),
sa.ForeignKeyConstraint(['id_transaction'], ['transaction.id'], )
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('category_transaction')
op.drop_table('transaction')
op.drop_table('categories')
op.drop_index(op.f('ix_user_email'), table_name='user')
op.drop_table('user')
# ### end Alembic commands ###

View File

@@ -0,0 +1,34 @@
"""update categories unique
Revision ID: 390041bd839e
Revises: 63e072f09836
Create Date: 2025-10-09 15:14:31.557686
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '390041bd839e'
down_revision: Union[str, Sequence[str], None] = '63e072f09836'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('name'), table_name='categories')
op.create_unique_constraint('uix_name_user_id', 'categories', ['name', 'user_id'])
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint('uix_name_user_id', 'categories', type_='unique')
op.create_index(op.f('name'), 'categories', ['name'], unique=True)
# ### end Alembic commands ###

View File

@@ -0,0 +1,48 @@
"""add user oauth
Revision ID: 7af8f296d089
Revises: 390041bd839e
Create Date: 2025-10-10 14:05:00.153376
"""
from typing import Sequence, Union
import fastapi_users_db_sqlalchemy
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '7af8f296d089'
down_revision: Union[str, Sequence[str], None] = '390041bd839e'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('oauth_account',
sa.Column('id', fastapi_users_db_sqlalchemy.generics.GUID(), nullable=False),
sa.Column('user_id', fastapi_users_db_sqlalchemy.generics.GUID(), nullable=False),
sa.Column('oauth_name', sa.String(length=100), nullable=False),
sa.Column('access_token', sa.String(length=1024), nullable=False),
sa.Column('expires_at', sa.Integer(), nullable=True),
sa.Column('refresh_token', sa.String(length=1024), nullable=True),
sa.Column('account_id', sa.String(length=320), nullable=False),
sa.Column('account_email', sa.String(length=320), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ondelete='cascade'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_oauth_account_account_id'), 'oauth_account', ['account_id'], unique=False)
op.create_index(op.f('ix_oauth_account_oauth_name'), 'oauth_account', ['oauth_name'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_oauth_account_oauth_name'), table_name='oauth_account')
op.drop_index(op.f('ix_oauth_account_account_id'), table_name='oauth_account')
op.drop_table('oauth_account')
# ### end Alembic commands ###

View File

@@ -0,0 +1,38 @@
"""change token length
Revision ID: 5ab2e654c96e
Revises: 7af8f296d089
Create Date: 2025-10-11 21:07:41.930470
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision: str = '5ab2e654c96e'
down_revision: Union[str, Sequence[str], None] = '7af8f296d089'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('oauth_account', 'access_token',
existing_type=mysql.VARCHAR(length=1024),
type_=sa.String(length=4096),
existing_nullable=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('oauth_account', 'access_token',
existing_type=sa.String(length=4096),
type_=mysql.VARCHAR(length=1024),
existing_nullable=False)
# ### end Alembic commands ###

View File

@@ -0,0 +1,32 @@
"""add config to user
Revision ID: eabec90a94fe
Revises: 5ab2e654c96e
Create Date: 2025-10-21 18:56:42.085973
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'eabec90a94fe'
down_revision: Union[str, Sequence[str], None] = '5ab2e654c96e'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('user', sa.Column('config', sa.JSON(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('user', 'config')
# ### end Alembic commands ###

View File

@@ -0,0 +1,32 @@
"""add date to transaction
Revision ID: 1f2a3c4d5e6f
Revises: eabec90a94fe
Create Date: 2025-10-22 16:18:00
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import func
# revision identifiers, used by Alembic.
revision: str = '1f2a3c4d5e6f'
down_revision: Union[str, Sequence[str], None] = 'eabec90a94fe'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema by adding date column with server default current_date."""
op.add_column(
'transaction',
sa.Column('date', sa.Date(), nullable=False, server_default=sa.text('CURRENT_DATE'))
)
def downgrade() -> None:
"""Downgrade schema by removing date column."""
op.drop_column('transaction', 'date')

View File

@@ -0,0 +1,47 @@
"""Add encrypted type
Revision ID: 46b9e702e83f
Revises: 1f2a3c4d5e6f
Create Date: 2025-10-29 13:26:24.568523
"""
from typing import Sequence, Union
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision: str = '46b9e702e83f'
down_revision: Union[str, Sequence[str], None] = '1f2a3c4d5e6f'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('transaction', 'amount',
existing_type=mysql.FLOAT(),
type_=sqlalchemy_utils.types.encrypted.encrypted_type.EncryptedType(),
existing_nullable=False)
op.alter_column('transaction', 'description',
existing_type=mysql.VARCHAR(length=255),
type_=sqlalchemy_utils.types.encrypted.encrypted_type.EncryptedType(),
existing_nullable=True)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('transaction', 'description',
existing_type=sqlalchemy_utils.types.encrypted.encrypted_type.EncryptedType(),
type_=mysql.VARCHAR(length=255),
existing_nullable=True)
op.alter_column('transaction', 'amount',
existing_type=sqlalchemy_utils.types.encrypted.encrypted_type.EncryptedType(),
type_=mysql.FLOAT(),
existing_nullable=False)
# ### end Alembic commands ###

View File

@@ -0,0 +1,46 @@
"""Cascade categories
Revision ID: 59cebf320c4a
Revises: 46b9e702e83f
Create Date: 2025-10-30 13:42:44.555284
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision: str = '59cebf320c4a'
down_revision: Union[str, Sequence[str], None] = '46b9e702e83f'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('category_transaction', sa.Column('category_id', sa.Integer(), nullable=False))
op.add_column('category_transaction', sa.Column('transaction_id', sa.Integer(), nullable=False))
op.drop_constraint(op.f('category_transaction_ibfk_2'), 'category_transaction', type_='foreignkey')
op.drop_constraint(op.f('category_transaction_ibfk_1'), 'category_transaction', type_='foreignkey')
op.create_foreign_key(None, 'category_transaction', 'transaction', ['transaction_id'], ['id'], ondelete='CASCADE')
op.create_foreign_key(None, 'category_transaction', 'categories', ['category_id'], ['id'], ondelete='CASCADE')
op.drop_column('category_transaction', 'id_category')
op.drop_column('category_transaction', 'id_transaction')
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('category_transaction', sa.Column('id_transaction', mysql.INTEGER(display_width=11), autoincrement=False, nullable=True))
op.add_column('category_transaction', sa.Column('id_category', mysql.INTEGER(display_width=11), autoincrement=False, nullable=True))
op.drop_constraint(None, 'category_transaction', type_='foreignkey')
op.drop_constraint(None, 'category_transaction', type_='foreignkey')
op.create_foreign_key(op.f('category_transaction_ibfk_1'), 'category_transaction', 'categories', ['id_category'], ['id'])
op.create_foreign_key(op.f('category_transaction_ibfk_2'), 'category_transaction', 'transaction', ['id_transaction'], ['id'])
op.drop_column('category_transaction', 'transaction_id')
op.drop_column('category_transaction', 'category_id')
# ### end Alembic commands ###

View File

View File

View File

View File

@@ -0,0 +1,66 @@
from fastapi import APIRouter, Depends, status
from fastapi_users import models
from fastapi_users.manager import BaseUserManager
from app.schemas.user import UserCreate, UserRead, UserUpdate
from app.services.user_service import auth_backend, fastapi_users
router = APIRouter()
@router.delete(
"/users/me",
status_code=status.HTTP_204_NO_CONTENT,
tags=["users"],
summary="Delete current user",
response_description="The user has been successfully deleted.",
)
async def delete_me(
user: models.UserProtocol = Depends(fastapi_users.current_user(active=True)),
user_manager: BaseUserManager = Depends(fastapi_users.get_user_manager),
):
"""
Delete the currently authenticated user.
"""
await user_manager.delete(user)
# Keep existing paths as-is under /auth/* and /users/*
from fastapi import Request, Response
from app.core.security import revoke_token, extract_bearer_token
@router.post(
"/auth/jwt/logout",
status_code=status.HTTP_204_NO_CONTENT,
tags=["auth"],
summary="Log out and revoke current token",
)
async def custom_logout(request: Request) -> Response:
"""Revoke the current bearer token so it cannot be used anymore."""
token = extract_bearer_token(request)
if token:
revoke_token(token)
return Response(status_code=status.HTTP_204_NO_CONTENT)
router.include_router(
fastapi_users.get_auth_router(auth_backend), prefix="/auth/jwt", tags=["auth"]
)
router.include_router(
fastapi_users.get_register_router(UserRead, UserCreate),
prefix="/auth",
tags=["auth"],
)
router.include_router(
fastapi_users.get_reset_password_router(),
prefix="/auth",
tags=["auth"],
)
router.include_router(
fastapi_users.get_verify_router(UserRead),
prefix="/auth",
tags=["auth"],
)
router.include_router(
fastapi_users.get_users_router(UserRead, UserUpdate),
prefix="/users",
tags=["users"],
)

View File

@@ -0,0 +1,108 @@
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select, delete
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.categories import Category
from app.schemas.category import CategoryCreate, CategoryRead, CategoryUpdate
from app.services.db import get_async_session
from app.services.user_service import current_active_user
from app.models.user import User
router = APIRouter(prefix="/categories", tags=["categories"])
@router.post("/create", response_model=CategoryRead, status_code=status.HTTP_201_CREATED)
async def create_category(
payload: CategoryCreate,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
# Enforce per-user unique name via query to provide 409 feedback
res = await session.execute(
select(Category).where(Category.user_id == user.id, Category.name == payload.name)
)
existing = res.scalar_one_or_none()
if existing:
raise HTTPException(status_code=409, detail="Category with this name already exists")
category = Category(name=payload.name, description=payload.description, user_id=user.id)
session.add(category)
await session.commit()
await session.refresh(category)
return category
@router.get("/", response_model=List[CategoryRead])
async def list_categories(
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
res = await session.execute(select(Category).where(Category.user_id == user.id))
return list(res.scalars())
@router.patch("/{category_id}", response_model=CategoryRead)
async def update_category(
category_id: int,
payload: CategoryUpdate,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
res = await session.execute(
select(Category).where(Category.id == category_id, Category.user_id == user.id)
)
category = res.scalar_one_or_none()
if not category:
raise HTTPException(status_code=404, detail="Category not found")
# If name changed, check uniqueness per user
if payload.name is not None and payload.name != category.name:
dup = await session.execute(
select(Category.id).where(Category.user_id == user.id, Category.name == payload.name)
)
if dup.scalar_one_or_none() is not None:
raise HTTPException(status_code=409, detail="Category with this name already exists")
category.name = payload.name
if payload.description is not None:
category.description = payload.description
await session.commit()
await session.refresh(category)
return category
@router.get("/{category_id}", response_model=CategoryRead)
async def get_category(
category_id: int,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
res = await session.execute(
select(Category).where(Category.id == category_id, Category.user_id == user.id)
)
category = res.scalar_one_or_none()
if not category:
raise HTTPException(status_code=404, detail="Category not found")
return category
@router.delete("/{category_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_category(
category_id: int,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
res = await session.execute(
select(Category.id).where(Category.id == category_id, Category.user_id == user.id)
)
if res.scalar_one_or_none() is None:
raise HTTPException(status_code=404, detail="Category not found")
await session.execute(
delete(Category).where(Category.id == category_id, Category.user_id == user.id)
)
await session.commit()
return None

View File

@@ -0,0 +1,40 @@
import json
import os
from fastapi import APIRouter
from fastapi.params import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.oauth.csas import CSASOAuth
from app.services.db import get_async_session
from app.services.user_service import current_active_user
router = APIRouter(prefix="/auth/csas", tags=["csas"])
CLIENT_ID = os.getenv("CSAS_CLIENT_ID")
CLIENT_SECRET = os.getenv("CSAS_CLIENT_SECRET")
CSAS_OAUTH = CSASOAuth(CLIENT_ID, CLIENT_SECRET)
@router.get("/authorize")
async def csas_authorize():
return {"authorization_url":
await CSAS_OAUTH.get_authorization_url(os.getenv("FRONTEND_DOMAIN_SCHEME") + "/auth/csas/callback")}
@router.get("/callback")
async def csas_callback(code: str, session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user)):
response = await CSAS_OAUTH.get_access_token(code, os.getenv("FRONTEND_DOMAIN_SCHEME") + "/auth/csas/callback")
if not user.config:
user.config = {}
new_dict = user.config.copy()
new_dict["csas"] = json.dumps(response)
user.config = new_dict
await session.commit()
return "OK"

View File

@@ -0,0 +1,66 @@
import os
from typing import List
import httpx
from fastapi import APIRouter, HTTPException, Query, status
router = APIRouter(prefix="/exchange-rates", tags=["exchange-rates"])
@router.get("", status_code=status.HTTP_200_OK)
async def get_exchange_rates(symbols: str = Query("EUR,USD,NOK", description="Comma-separated currency codes to fetch vs CZK")):
"""
Fetch exchange rates from UniRate API on the backend and return CZK-per-target rates.
- Always requests CZK in addition to requested symbols to compute conversion from USD-base.
- Returns a list of {currencyCode, rate} where rate is CZK per 1 unit of the target currency.
"""
api_key = os.getenv("UNIRATE_API_KEY")
if not api_key:
raise HTTPException(status_code=500, detail="Server is not configured with UNIRATE_API_KEY")
# Ensure CZK is included for conversion
requested = [s.strip().upper() for s in symbols.split(",") if s.strip()]
if "CZK" not in requested:
requested.append("CZK")
query_symbols = ",".join(sorted(set(requested)))
url = f"https://unirateapi.com/api/rates?api_key={api_key}&symbols={query_symbols}"
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(15.0)) as client:
resp = await client.get(url)
if resp.status_code != httpx.codes.OK:
raise HTTPException(status_code=502, detail=f"Upstream UniRate error: HTTP {resp.status_code}")
data = resp.json()
except httpx.HTTPError as e:
raise HTTPException(status_code=502, detail=f"Failed to contact UniRate: {str(e)}")
# Validate response structure
rates = data.get("rates") if isinstance(data, dict) else None
base = data.get("base") if isinstance(data, dict) else None
if not rates or base != "USD" or "CZK" not in rates:
# Prefer upstream message when available
detail = data.get("message") if isinstance(data, dict) else None
if not detail and isinstance(data, dict):
err = data.get("error")
if isinstance(err, dict):
detail = err.get("info")
raise HTTPException(status_code=502, detail=detail or "Invalid response from UniRate API")
czk_per_usd = rates["CZK"]
# Build result excluding CZK itself
result = []
for code in requested:
if code == "CZK":
continue
target_per_usd = rates.get(code)
if target_per_usd in (None, 0):
# Skip unavailable or invalid
continue
czk_per_target = czk_per_usd / target_per_usd
result.append({"currencyCode": code, "rate": czk_per_target})
return result

View File

@@ -0,0 +1,116 @@
from datetime import datetime, timedelta
from typing import List, Optional
import random
from fastapi import APIRouter, Depends
from pydantic import BaseModel, Field, conint, confloat, validator
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.db import get_async_session
from app.services.user_service import current_active_user
from app.models.user import User
from app.models.transaction import Transaction
from app.models.categories import Category
from app.schemas.transaction import TransactionRead
router = APIRouter(prefix="/mock-bank", tags=["mock-bank"])
class GenerateOptions(BaseModel):
count: conint(strict=True, gt=0) = Field(default=10, description="Number of transactions to generate")
minAmount: confloat(strict=True) = Field(default=-200.0, description="Minimum transaction amount")
maxAmount: confloat(strict=True) = Field(default=200.0, description="Maximum transaction amount")
startDate: Optional[str] = Field(None, description="Earliest date (YYYY-MM-DD)")
endDate: Optional[str] = Field(None, description="Latest date (YYYY-MM-DD)")
categoryIds: List[int] = Field(default_factory=list, description="Optional category IDs to assign randomly")
@validator("maxAmount")
def _validate_amounts(cls, v, values):
min_amt = values.get("minAmount")
if min_amt is not None and v < min_amt:
raise ValueError("maxAmount must be greater than or equal to minAmount")
return v
@validator("endDate")
def _validate_dates(cls, v, values):
sd = values.get("startDate")
if v and sd:
try:
ed = datetime.strptime(v, "%Y-%m-%d").date()
st = datetime.strptime(sd, "%Y-%m-%d").date()
except ValueError:
raise ValueError("Invalid date format, expected YYYY-MM-DD")
if ed < st:
raise ValueError("endDate must be greater than or equal to startDate")
return v
class GeneratedTransaction(BaseModel):
amount: float
date: str # YYYY-MM-DD
category_ids: List[int] = []
description: Optional[str] = None
@router.post("/generate", response_model=List[GeneratedTransaction])
async def generate_mock_transactions(
options: GenerateOptions,
user: User = Depends(current_active_user),
):
# Seed randomness per user to make results less erratic across multiple calls in quick succession
seed = int(datetime.utcnow().timestamp()) ^ int(user.id)
rnd = random.Random(seed)
# Determine date range
if options.startDate:
start_date = datetime.strptime(options.startDate, "%Y-%m-%d").date()
else:
start_date = (datetime.utcnow() - timedelta(days=365)).date()
if options.endDate:
end_date = datetime.strptime(options.endDate, "%Y-%m-%d").date()
else:
end_date = datetime.utcnow().date()
span_days = max(0, (end_date - start_date).days)
results: List[GeneratedTransaction] = []
for _ in range(options.count):
amount = round(rnd.uniform(options.minAmount, options.maxAmount), 2)
# Pick a random date in the inclusive range
rand_day = rnd.randint(0, span_days) if span_days > 0 else 0
tx_date = start_date + timedelta(days=rand_day)
# Pick category randomly from provided list, or empty
if options.categoryIds:
cat = [rnd.choice(options.categoryIds)]
else:
cat = []
# Optional simple description for flavor
desc = None
# Assemble
results.append(GeneratedTransaction(
amount=amount,
date=tx_date.isoformat(),
category_ids=cat,
description=desc,
))
return results
@router.get("/scrape")
async def scrape_mock_bank():
# 80% of the time: nothing to scrape
if random.random() < 0.8:
return []
transactions = []
count = random.randint(1, 10)
for _ in range(count):
transactions.append({
"amount": round(random.uniform(-200.0, 200.0), 2),
"date": (datetime.utcnow().date() - timedelta(days=random.randint(0, 30))).isoformat(),
"description": "Mock transaction",
})
return transactions

View File

@@ -0,0 +1,280 @@
from typing import List, Optional
from datetime import date
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select, and_, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.transaction import Transaction
from app.models.categories import Category
from app.schemas.transaction import (
TransactionCreate,
TransactionRead,
TransactionUpdate,
)
from app.services.db import get_async_session
from app.services.user_service import current_active_user
from app.models.user import User
router = APIRouter(prefix="/transactions", tags=["transactions"])
def _to_read_model(tx: Transaction) -> TransactionRead:
return TransactionRead(
id=tx.id,
amount=tx.amount,
description=tx.description,
date=tx.date,
category_ids=[c.id for c in (tx.categories or [])],
)
@router.post("/create", response_model=TransactionRead, status_code=status.HTTP_201_CREATED)
async def create_transaction(
payload: TransactionCreate,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
# Build transaction; set `date` only if provided to let DB default apply otherwise
tx_kwargs = dict(
amount=payload.amount,
description=payload.description,
user_id=user.id,
)
if payload.date is not None:
parsed_date = payload.date
if isinstance(parsed_date, str):
try:
parsed_date = date.fromisoformat(parsed_date)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format, expected YYYY-MM-DD")
tx_kwargs["date"] = parsed_date
tx = Transaction(**tx_kwargs)
# Attach categories if provided (and owned by user)
if payload.category_ids:
res = await session.execute(
select(Category).where(
Category.user_id == user.id, Category.id.in_(payload.category_ids)
)
)
categories = list(res.scalars())
if len(categories) != len(set(payload.category_ids)):
raise HTTPException(
status_code=400,
detail="Duplicate category IDs provided or one or more categories not found"
)
tx.categories = categories
session.add(tx)
await session.commit()
await session.refresh(tx)
# Ensure categories are loaded
await session.refresh(tx, attribute_names=["categories"])
return _to_read_model(tx)
@router.get("/", response_model=List[TransactionRead])
async def list_transactions(
start_date: Optional[date] = None,
end_date: Optional[date] = None,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
cond = [Transaction.user_id == user.id]
if start_date is not None:
cond.append(Transaction.date >= start_date)
if end_date is not None:
cond.append(Transaction.date <= end_date)
res = await session.execute(
select(Transaction).where(and_(*cond)).order_by(Transaction.date, Transaction.id)
)
txs = list(res.scalars())
# Eagerly load categories for each transaction
for tx in txs:
await session.refresh(tx, attribute_names=["categories"])
return [_to_read_model(tx) for tx in txs]
@router.get("/balance_series")
async def get_balance_series(
start_date: Optional[date] = None,
end_date: Optional[date] = None,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
cond = [Transaction.user_id == user.id]
if start_date is not None:
cond.append(Transaction.date >= start_date)
if end_date is not None:
cond.append(Transaction.date <= end_date)
res = await session.execute(
select(Transaction).where(and_(*cond)).order_by(Transaction.date, Transaction.id)
)
txs = list(res.scalars())
# Group by date and accumulate
daily = {}
for tx in txs:
key = tx.date.isoformat() if hasattr(tx.date, 'isoformat') else str(tx.date)
daily[key] = daily.get(key, 0.0) + float(tx.amount)
# Build cumulative series sorted by date
series = []
running = 0.0
for d in sorted(daily.keys()):
running += daily[d]
series.append({"date": d, "balance": running})
return series
@router.get("/{transaction_id}", response_model=TransactionRead)
async def get_transaction(
transaction_id: int,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
res = await session.execute(
select(Transaction).where(
Transaction.id == transaction_id, Transaction.user_id == user.id
)
)
tx: Optional[Transaction] = res.scalar_one_or_none()
if not tx:
raise HTTPException(status_code=404, detail="Transaction not found")
await session.refresh(tx, attribute_names=["categories"])
return _to_read_model(tx)
@router.patch("/{transaction_id}/edit", response_model=TransactionRead)
async def update_transaction(
transaction_id: int,
payload: TransactionUpdate,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
res = await session.execute(
select(Transaction).where(
Transaction.id == transaction_id, Transaction.user_id == user.id
)
)
tx: Optional[Transaction] = res.scalar_one_or_none()
if not tx:
raise HTTPException(status_code=404, detail="Transaction not found")
if payload.amount is not None:
tx.amount = payload.amount
if payload.description is not None:
tx.description = payload.description
if payload.date is not None:
new_date = payload.date
if isinstance(new_date, str):
try:
new_date = date.fromisoformat(new_date)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format, expected YYYY-MM-DD")
tx.date = new_date
if payload.category_ids is not None:
# Preload categories to avoid async lazy-load during assignment
await session.refresh(tx, attribute_names=["categories"])
if payload.category_ids:
# Check for duplicate category IDs in the payload
if len(payload.category_ids) != len(set(payload.category_ids)):
raise HTTPException(status_code=400, detail="Duplicate category IDs in payload")
res = await session.execute(
select(Category).where(
Category.user_id == user.id, Category.id.in_(payload.category_ids)
)
)
categories = list(res.scalars())
if len(categories) != len(payload.category_ids):
raise HTTPException(status_code=400, detail="One or more categories not found")
tx.categories = categories
else:
tx.categories = []
await session.commit()
await session.refresh(tx, attribute_names=["categories"])
return _to_read_model(tx)
@router.delete("/{transaction_id}/delete", status_code=status.HTTP_204_NO_CONTENT)
async def delete_transaction(
transaction_id: int,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
res = await session.execute(
select(Transaction).where(
Transaction.id == transaction_id, Transaction.user_id == user.id
)
)
tx = res.scalar_one_or_none()
if not tx:
raise HTTPException(status_code=404, detail="Transaction not found")
await session.delete(tx)
await session.commit()
return None
@router.post("/{transaction_id}/categories/{category_id}", response_model=TransactionRead)
async def assign_category(
transaction_id: int,
category_id: int,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
# Load transaction and category ensuring ownership
res_tx = await session.execute(
select(Transaction).where(
Transaction.id == transaction_id, Transaction.user_id == user.id
)
)
tx: Optional[Transaction] = res_tx.scalar_one_or_none()
if not tx:
raise HTTPException(status_code=404, detail="Transaction not found")
res_cat = await session.execute(
select(Category).where(Category.id == category_id, Category.user_id == user.id)
)
cat: Optional[Category] = res_cat.scalar_one_or_none()
if not cat:
raise HTTPException(status_code=404, detail="Category not found")
await session.refresh(tx, attribute_names=["categories"])
if cat not in tx.categories:
tx.categories.append(cat)
await session.commit()
await session.refresh(tx, attribute_names=["categories"])
return _to_read_model(tx)
@router.delete("/{transaction_id}/categories/{category_id}", response_model=TransactionRead)
async def unassign_category(
transaction_id: int,
category_id: int,
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
res_tx = await session.execute(
select(Transaction).where(
Transaction.id == transaction_id, Transaction.user_id == user.id
)
)
tx: Optional[Transaction] = res_tx.scalar_one_or_none()
if not tx:
raise HTTPException(status_code=404, detail="Transaction not found")
res_cat = await session.execute(
select(Category).where(Category.id == category_id, Category.user_id == user.id)
)
cat: Optional[Category] = res_cat.scalar_one_or_none()
if not cat:
raise HTTPException(status_code=404, detail="Category not found")
await session.refresh(tx, attribute_names=["categories"])
if cat in tx.categories:
tx.categories.remove(cat)
await session.commit()
await session.refresh(tx, attribute_names=["categories"])
return _to_read_model(tx)

View File

@@ -0,0 +1,176 @@
import json
import logging
import os
import sys
from datetime import datetime
from pythonjsonlogger import jsonlogger
from fastapi import Depends, FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from prometheus_fastapi_instrumentator import Instrumentator, metrics
from starlette.requests import Request
from app.services.prometheus import number_of_users, number_of_transactions
from app.services import bank_scraper
from app.workers.celery_tasks import load_transactions, load_all_transactions
from app.models.user import User, OAuthAccount
from app.services.user_service import current_active_verified_user
from app.api.auth import router as auth_router
from app.api.csas import router as csas_router
from app.api.categories import router as categories_router
from app.api.transactions import router as transactions_router
from app.api.exchange_rates import router as exchange_rates_router
from app.services.user_service import auth_backend, current_active_verified_user, fastapi_users, get_oauth_provider, \
UserManager, get_jwt_strategy
from app.core.security import extract_bearer_token, is_token_revoked, decode_and_verify_jwt
from app.services.user_service import SECRET
from fastapi import FastAPI
import sentry_sdk
from fastapi_users.db import SQLAlchemyUserDatabase
from app.core.db import async_session_maker, engine
from app.core.base import Base
sentry_sdk.init(
dsn=os.getenv("SENTRY_DSN"),
send_default_pii=True,
)
fastApi = FastAPI()
# CORS for frontend dev server
fastApi.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:5173",
"http://127.0.0.1:5173",
os.getenv("FRONTEND_DOMAIN_SCHEME", "")
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
if not os.getenv("PYTEST_RUN_CONFIG"):
prometheus = Instrumentator().instrument(fastApi)
# Register custom metrics
prometheus.add(number_of_users()).add(number_of_transactions())
prometheus.expose(
fastApi,
endpoint="/metrics",
include_in_schema=True,
)
fastApi.include_router(auth_router)
fastApi.include_router(categories_router)
fastApi.include_router(transactions_router)
fastApi.include_router(exchange_rates_router)
from app.api.mock_bank import router as mock_bank_router
fastApi.include_router(mock_bank_router)
for h in list(logging.root.handlers):
logging.root.removeHandler(h)
_log_handler = logging.StreamHandler(sys.stdout)
_formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(levelname)s %(name)s %(message)s %(pathname)s %(lineno)d %(process)d %(thread)d'
)
_log_handler.setFormatter(_formatter)
logging.root.setLevel(logging.INFO)
logging.root.addHandler(_log_handler)
for _name in ("uvicorn", "uvicorn.error", "uvicorn.access"):
_logger = logging.getLogger(_name)
_logger.handlers = [_log_handler]
_logger.propagate = True
@fastApi.middleware("http")
async def auth_guard(request: Request, call_next):
# Enforce revoked/expired JWTs are rejected globally
token = extract_bearer_token(request)
if token:
from fastapi import Response, status as _status
# Deny if token is revoked
if is_token_revoked(token):
return Response(status_code=_status.HTTP_401_UNAUTHORIZED)
# Deny if token is expired or invalid
try:
decode_and_verify_jwt(token, SECRET)
except Exception:
return Response(status_code=_status.HTTP_401_UNAUTHORIZED)
return await call_next(request)
@fastApi.middleware("http")
async def log_traffic(request: Request, call_next):
start_time = datetime.now()
response = await call_next(request)
process_time = (datetime.now() - start_time).total_seconds()
client_host = request.client.host
log_params = {
"request_method": request.method,
"request_url": str(request.url),
"request_size": request.headers.get("content-length"),
"request_headers": dict(request.headers),
"response_status": response.status_code,
"response_size": response.headers.get("content-length"),
"response_headers": dict(response.headers),
"process_time": process_time,
"client_host": client_host
}
logging.getLogger(__name__).info("http_request", extra=log_params)
return response
fastApi.include_router(
fastapi_users.get_oauth_router(
get_oauth_provider("MojeID"),
auth_backend,
"SECRET",
associate_by_email=True,
redirect_url=os.getenv("FRONTEND_DOMAIN_SCHEME", "http://localhost:3000") + "/auth/mojeid/callback",
),
prefix="/auth/mojeid",
tags=["auth"],
)
fastApi.include_router(
fastapi_users.get_oauth_router(
get_oauth_provider("BankID"),
auth_backend,
"SECRET",
associate_by_email=True,
redirect_url=os.getenv("FRONTEND_DOMAIN_SCHEME", "http://localhost:3000") + "/auth/bankid/callback",
),
prefix="/auth/bankid",
tags=["auth"],
)
fastApi.include_router(csas_router)
# Liveness/root endpoint
@fastApi.get("/", include_in_schema=False)
async def root():
return {"status": "ok"}
@fastApi.get("/authenticated-route")
async def authenticated_route(user: User = Depends(current_active_verified_user)):
return {"message": f"Hello {user.email}!"}
@fastApi.get("/_cron", include_in_schema=False)
async def handle_cron(request: Request):
# endpoint accessed by Clodflare => return 404
if request.headers.get("cf-connecting-ip"):
raise HTTPException(status_code=404)
logging.info("[Cron] Triggering scheduled tasks via HTTP endpoint")
task = load_all_transactions.delay()
return {"status": "queued", "action": "csas_scrape_all", "task_id": getattr(task, 'id', None)}

View File

@@ -0,0 +1,50 @@
import os
from celery import Celery
if os.getenv("RABBITMQ_URL"):
RABBITMQ_URL = os.getenv("RABBITMQ_URL") # type: ignore
else:
from urllib.parse import quote
username = os.getenv("RABBITMQ_USERNAME", "user")
password = os.getenv("RABBITMQ_PASSWORD", "bitnami123")
host = os.getenv("RABBITMQ_HOST", "localhost")
port = os.getenv("RABBITMQ_PORT", "5672")
vhost = os.getenv("RABBITMQ_VHOST", "/")
use_ssl = os.getenv("RABBITMQ_USE_SSL", "0").lower() in {"1", "true", "yes"}
scheme = "amqps" if use_ssl else "amqp"
# Kombu uses '//' to denote the default '/' vhost. For custom vhosts, URL-encode them.
if vhost in ("/", ""):
vhost_path = "/" # will become '//' after concatenation below
else:
vhost_path = f"/{quote(vhost, safe='')}"
# Ensure we end up with e.g. amqp://user:pass@host:5672// (for '/')
RABBITMQ_URL = f"{scheme}://{username}:{password}@{host}:{port}{vhost_path}"
if vhost in ("/", "") and not RABBITMQ_URL.endswith("//"):
RABBITMQ_URL += "/"
DEFAULT_QUEUE = os.getenv("MAIL_QUEUE", "mail_queue")
CELERY_BACKEND = os.getenv("CELERY_BACKEND", "rpc://")
celery_app = Celery(
"app",
broker=RABBITMQ_URL,
# backend=CELERY_BACKEND,
)
celery_app.autodiscover_tasks(["app.workers"], related_name="celery_tasks") # discover app.workers.celery_tasks
celery_app.set_default()
celery_app.conf.update(
task_default_queue=DEFAULT_QUEUE,
task_acks_late=True,
worker_prefetch_multiplier=int(os.getenv("CELERY_PREFETCH", "1")),
task_serializer="json",
result_serializer="json",
accept_content=["json"],
)
__all__ = ["celery_app"]

View File

@@ -0,0 +1,4 @@
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
Base: DeclarativeMeta = declarative_base()

View File

@@ -0,0 +1,45 @@
import os
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.base import Base
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
mariadb_host = os.getenv("MARIADB_HOST", "localhost")
mariadb_port = os.getenv("MARIADB_PORT", "3306")
mariadb_db = os.getenv("MARIADB_DB", "group_project")
mariadb_user = os.getenv("MARIADB_USER", "root")
mariadb_password = os.getenv("MARIADB_PASSWORD", "strongpassword")
if mariadb_host and mariadb_db and mariadb_user and mariadb_password:
DATABASE_URL = f"mysql+asyncmy://{mariadb_user}:{mariadb_password}@{mariadb_host}:{mariadb_port}/{mariadb_db}"
else:
raise Exception("Only MariaDB is supported. Please set the DATABASE_URL environment variable.")
# Load all models to register them
from app.models.user import User
from app.models.transaction import Transaction
from app.models.categories import Category
host_env = os.getenv("MARIADB_HOST", "localhost")
ssl_enabled = host_env not in {"localhost", "127.0.0.1"}
connect_args = {"ssl": {"ssl": True}} if ssl_enabled else {}
# Async engine/session for the async parts of the app
engine = create_async_engine(
DATABASE_URL,
pool_pre_ping=True,
echo=os.getenv("SQL_ECHO", "0") == "1",
connect_args=connect_args,
)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
# Synchronous engine/session for sync utilities (e.g., bank_scraper)
SYNC_DATABASE_URL = DATABASE_URL.replace("+asyncmy", "+pymysql")
engine_sync = create_engine(
SYNC_DATABASE_URL,
pool_pre_ping=True,
echo=os.getenv("SQL_ECHO", "0") == "1",
connect_args=connect_args,
)
sync_session_maker = sessionmaker(bind=engine_sync, expire_on_commit=False)

View File

@@ -0,0 +1,6 @@
import app.celery_app # noqa: F401
from app.workers.celery_tasks import send_email
def enqueue_email(to: str, subject: str, body: str) -> None:
send_email.delay(to, subject, body)

View File

@@ -0,0 +1,52 @@
from typing import Optional
import re
import jwt
from fastapi import Request
# Simple in-memory revocation store for revoked JWT tokens.
#
# Limitations:
# - All revoked tokens will be lost if the process restarts (data loss on restart).
# - Not suitable for multi-instance deployments: the revocation list is not shared between instances.
# A token revoked in one instance will not be recognized as revoked in others.
#
# For production, use a persistent and shared store (e.g., Redis or a database).
_REVOKED_TOKENS: set[str] = set()
# Bearer token regex
_BEARER_RE = re.compile(r"^[Bb]earer\s+(.+)$")
def extract_bearer_token(request: Request) -> Optional[str]:
auth = request.headers.get("authorization")
if not auth:
return None
m = _BEARER_RE.match(auth)
if not m:
return None
return m.group(1).strip()
def revoke_token(token: str) -> None:
if token:
_REVOKED_TOKENS.add(token)
def is_token_revoked(token: str) -> bool:
return token in _REVOKED_TOKENS
def decode_and_verify_jwt(token: str, secret: str) -> dict:
"""
Decode the JWT using the shared secret, verifying expiration and signature.
Audience is not verified here to be compatible with fastapi-users default tokens.
Raises jwt.ExpiredSignatureError if expired.
Raises jwt.InvalidTokenError for other issues.
Returns the decoded payload dict on success.
"""
return jwt.decode(
token,
secret,
algorithms=["HS256"],
options={"verify_aud": False},
) # verify_exp is True by default

View File

@@ -0,0 +1,25 @@
from fastapi_users_db_sqlalchemy import GUID
from sqlalchemy import Column, Integer, String, ForeignKey, Table, UniqueConstraint
from sqlalchemy.orm import relationship
from app.core.base import Base
association_table = Table(
"category_transaction",
Base.metadata,
Column("category_id", Integer, ForeignKey("categories.id", ondelete="CASCADE"), primary_key=True),
Column("transaction_id", Integer, ForeignKey("transaction.id", ondelete="CASCADE"), primary_key=True)
)
class Category(Base):
__tablename__ = "categories"
__table_args__ = (
UniqueConstraint("name", "user_id", name="uix_name_user_id"),
)
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(length=100), nullable=False)
description = Column(String(length=255), nullable=True)
user_id = Column(GUID, ForeignKey("user.id"), nullable=False)
user = relationship("User", back_populates="categories")
transactions = relationship("Transaction", secondary=association_table, back_populates="categories")

View File

@@ -0,0 +1,24 @@
import os
from fastapi_users_db_sqlalchemy import GUID
from sqlalchemy import Column, Integer, String, Float, ForeignKey, Date, func
from sqlalchemy.orm import relationship
from sqlalchemy_utils import EncryptedType
from sqlalchemy_utils.types.encrypted.encrypted_type import FernetEngine
from app.core.base import Base
from app.models.categories import association_table
SECRET_KEY = os.environ.get("DB_ENCRYPTION_KEY", "localdev")
class Transaction(Base):
__tablename__ = "transaction"
id = Column(Integer, primary_key=True, autoincrement=True)
amount = Column(EncryptedType(Float, SECRET_KEY, engine=FernetEngine), nullable=False)
description = Column(EncryptedType(String(length=255), SECRET_KEY, engine=FernetEngine), nullable=True)
date = Column(Date, nullable=False, server_default=func.current_date())
user_id = Column(GUID, ForeignKey("user.id"), nullable=False)
# Relationship
user = relationship("User", back_populates="transactions")
categories = relationship("Category", secondary=association_table, back_populates="transactions", passive_deletes=True)

View File

@@ -0,0 +1,22 @@
from sqlalchemy import Column, String
from sqlalchemy.orm import relationship, mapped_column, Mapped
from fastapi_users.db import SQLAlchemyBaseUserTableUUID, SQLAlchemyBaseOAuthAccountTableUUID
from sqlalchemy.sql.sqltypes import JSON
from app.core.base import Base
class OAuthAccount(SQLAlchemyBaseOAuthAccountTableUUID, Base):
# BankID token is longer than default
access_token: Mapped[str] = mapped_column(String(length=4096), nullable=False)
class User(SQLAlchemyBaseUserTableUUID, Base):
first_name = Column(String(length=100), nullable=True)
last_name = Column(String(length=100), nullable=True)
oauth_accounts = relationship("OAuthAccount", lazy="joined")
config = Column(JSON, default={})
# Relationship
transactions = relationship("Transaction", back_populates="user")
categories = relationship("Category", back_populates="user")

View File

@@ -0,0 +1,50 @@
import secrets
from typing import Optional, Literal
from httpx_oauth.oauth2 import T
from app.oauth.custom_openid import CustomOpenID
class BankID(CustomOpenID):
def __init__(self, client_id: str, client_secret: str):
super().__init__(
client_id,
client_secret,
"https://oidc.sandbox.bankid.cz/.well-known/openid-configuration",
"BankID",
base_scopes=["openid", "profile.email", "profile.name"],
)
async def get_user_info(self, token: str) -> dict:
info = await self.get_profile(token)
return {
"first_name": info.get("given_name"),
"last_name": info.get("family_name"),
}
async def get_authorization_url(
self,
redirect_uri: str,
state: Optional[str] = None,
scope: Optional[list[str]] = None,
code_challenge: Optional[str] = None,
code_challenge_method: Optional[Literal["plain", "S256"]] = None,
extras_params: Optional[T] = None,
) -> str:
if extras_params is None:
extras_params = {}
# BankID requires random nonce parameter for security
# https://developer.bankid.cz/docs/security_sep
extras_params["nonce"] = secrets.token_urlsafe()
return await super().get_authorization_url(
redirect_uri,
state,
scope,
code_challenge,
code_challenge_method,
extras_params,
)

View File

@@ -0,0 +1,33 @@
import os
from os.path import dirname, join
from typing import Optional, Any
import httpx
from httpx_oauth.exceptions import GetProfileError
from httpx_oauth.oauth2 import BaseOAuth2
import app.services.db
BASE_DIR = dirname(__file__)
certs = (
join(BASE_DIR, "public_key.pem"),
join(BASE_DIR, "private_key.key")
)
class CSASOAuth(BaseOAuth2):
def __init__(self, client_id: str, client_secret: str):
super().__init__(
client_id,
client_secret,
base_scopes=["aisp"],
authorize_endpoint="https://webapi.developers.erstegroup.com/api/csas/sandbox/v1/sandbox-idp/auth",
access_token_endpoint="https://webapi.developers.erstegroup.com/api/csas/sandbox/v1/sandbox-idp/token",
refresh_token_endpoint="https://webapi.developers.erstegroup.com/api/csas/sandbox/v1/sandbox-idp/token"
)

View File

@@ -0,0 +1,6 @@
from httpx_oauth.clients.openid import OpenID
class CustomOpenID(OpenID):
async def get_user_info(self, token: str) -> dict:
raise NotImplementedError()

View File

@@ -0,0 +1,56 @@
import json
from typing import Optional, Literal, Any
from httpx_oauth.oauth2 import T
from app.oauth.custom_openid import CustomOpenID
class MojeIDOAuth(CustomOpenID):
def __init__(self, client_id: str, client_secret: str):
super().__init__(
client_id,
client_secret,
"https://mojeid.cz/.well-known/openid-configuration/",
"MojeID",
base_scopes=["openid", "email", "profile"],
)
async def get_user_info(self, token: str) -> Optional[Any]:
info = await self.get_profile(token)
return {
"first_name": info.get("given_name"),
"last_name": info.get("family_name"),
}
async def get_authorization_url(
self,
redirect_uri: str,
state: Optional[str] = None,
scope: Optional[list[str]] = None,
code_challenge: Optional[str] = None,
code_challenge_method: Optional[Literal["plain", "S256"]] = None,
extras_params: Optional[T] = None,
) -> str:
required_fields = {
'id_token': {
'name': {'essential': True},
'given_name': {'essential': True},
'family_name': {'essential': True},
'email': {'essential': True},
'mojeid_valid': {'essential': True},
}}
if extras_params is None:
extras_params = {}
extras_params["claims"] = json.dumps(required_fields)
return await super().get_authorization_url(
redirect_uri,
state,
scope,
code_challenge,
code_challenge_method,
extras_params,
)

View File

@@ -0,0 +1,28 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDcr/oxgV074ETd
DkP/0l8LFnRofru+m2wNNG/ttVCioTqwnvR4oYxwq3U9qIBsT0D+Rx/Ef7qcpzqf
/w9xt6Hosdv6I5jMHGaVQqLiPuV26/a7WvcmU+PpYuEBmbBHjGVJRBwgPtlUW1VL
M8Pht9YiaagEKvFa6SUidZLfPv+ECohqgH4mgMrEcG/BTnry0/5xQdadRC9o25cl
NtZIesS5GPeelhggFTkbh/FaxvMXhIAaRXT61cnxgxtfM71h5ObX5Lwle9z5a+Tw
xgQhSQq1jbHALYvTwsc4Q/NQGXpGNWy599sb7dg5AkPFSSF4ceXBo/2jOaZCqWrt
FVONZ+blAgMBAAECggEBAJwQbrRXsaFIRiq1jez5znC+3m+PQCHZM55a+NR3pqB7
uE9y+ZvdUr3S4sRJxxfRLDsl/Rcu5L8nm9PNwhQ/MmamcNQCHGoro3fmed3ZcNia
og94ktMt/DztygUhtIHEjVQ0sFc1WufG9xiJcPrM0MfhRAo+fBQ4UCSAVO8/U98B
a4yukrPNeEA03hyjLB9W41pNQfyOtAHqzwDg9Q5XVaGMCLZT1bjCIquUcht5iMva
tiw3cwdiYIklLTzTCsPPK9A/AlWZyUXL8KxtN0mU0kkwlXqASoXZ2nqdkhjRye/V
3JXOmlDtDaJCqWDpH2gHLxMCl7OjfPvuD66bAT3H63kCgYEA5zxW/l6oI3gwYW7+
j6rEjA2n8LikVnyW2e/PZ7pxBH3iBFe2DHx/imeqd/0IzixcM1zZT/V+PTFPQizG
lOU7stN6Zg/LuRdxneHPyLWCimJP7BBJCWyJkuxKy9psokyBhGSLR/phL3fP7UkB
o2I3vGmTFu5A0FzXcNH/cXPMdy8CgYEA9FJw3kyzXlInhJ6Cd63mckLPLYDArUsm
THBoeH2CVTBS5g0bCbl7N1ZxUoYwZPD4lg5V0nWhZALGf+85ULSjX03PMf1cc6WW
EIbZIo9hX+mGRa/FudDd+TlbtBnn0jucwABuLQi9mIepE55Hu9tw5/FT3cHeZVQc
cC0T6ulVvisCgYBCzFeFG+sOdAXl356B+h7VJozBKVWv9kXNp00O9fj4BzVnc78P
VFezr8a66snEZWQtIkFUq+JP4xK2VyD2mlHoktbk7OM5EOCtbzILFQQk3cmgtAOl
SUlkvAXPZcXEDL3NdQ4XOOkiQUY7kb97Z0AamZT4JtNqXaeO29si9wS12QKBgHYg
Hd3864Qg6GZgVOgUNiTsVErFw2KFwQCYIIqQ9CDH+myrzXTILuC0dJnXszI6p5W1
XJ0irmMyTFKykN2KWKrNbe3Xd4mad5GKARWKiSPcPkUXFNwgNhI3PzU2iTTGCaVz
D9HKNhC3FnIbxsb29AHQViITh7kqD43U3ZpoMkJ9AoGAZ+sg+CPfuo3ZMpbcdb3B
ZX2UhAvNKxgHvNnHOjO+pvaM7HiH+BT0650brfBWQ0nTG1dt18mCevVk1UM/5hO9
AtZw06vCLOJ3p3qpgkSlRZ1H7VokG9M8Od0zXqtJrmeLeBq7dfuDisYOuA+NUEbJ
UM/UHByieS6ywetruz0LpM0=
-----END RSA PRIVATE KEY-----

View File

@@ -0,0 +1,31 @@
-----BEGIN CERTIFICATE-----
MIIFSTCCAzGgAwIBAgIEAQIDBDANBgkqhkiG9w0BAQsFADCBgDELMAkGA1UEBhMC
Q1oxDjAMBgNVBAcTBUN6ZWNoMRMwEQYDVQQKEwpFcnN0ZUdyb3VwMRUwEwYDVQQL
EwxFcnN0ZUh1YlRlYW0xETAPBgNVBAMTCEVyc3RlSHViMSIwIAYJKoZIhvcNAQkB
FhNpbmZvQGVyc3RlZ3JvdXAuY29tMB4XDTIyMTIxNDA4MDc1N1oXDTI2MDMxNDA4
MDc1N1owUjEaMBgGA1UEYRMRUFNEQ1otQ05CLTEyMzQ1NjcxCzAJBgNVBAYTAkNa
MRYwFAYDVQQDEw1UUFAgVGVzdCBRV0FDMQ8wDQYDVQQKEwZNeSBUUFAwggEiMA0G
CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDcr/oxgV074ETdDkP/0l8LFnRofru+
m2wNNG/ttVCioTqwnvR4oYxwq3U9qIBsT0D+Rx/Ef7qcpzqf/w9xt6Hosdv6I5jM
HGaVQqLiPuV26/a7WvcmU+PpYuEBmbBHjGVJRBwgPtlUW1VLM8Pht9YiaagEKvFa
6SUidZLfPv+ECohqgH4mgMrEcG/BTnry0/5xQdadRC9o25clNtZIesS5GPeelhgg
FTkbh/FaxvMXhIAaRXT61cnxgxtfM71h5ObX5Lwle9z5a+TwxgQhSQq1jbHALYvT
wsc4Q/NQGXpGNWy599sb7dg5AkPFSSF4ceXBo/2jOaZCqWrtFVONZ+blAgMBAAGj
gfcwgfQwCwYDVR0PBAQDAgHGMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcD
AjCBrwYIKwYBBQUHAQMEgaIwgZ8wCAYGBACORgEBMAsGBgQAjkYBAwIBFDAIBgYE
AI5GAQQwEwYGBACORgEGMAkGBwQAjkYBBgMwZwYGBACBmCcCMF0wTDARBgcEAIGY
JwEBDAZQU1BfQVMwEQYHBACBmCcBAgwGUFNQX1BJMBEGBwQAgZgnAQMMBlBTUF9B
STARBgcEAIGYJwEEDAZQU1BfSUMMBUVyc3RlDAZBVC1FUlMwFAYDVR0RBA0wC4IJ
bXl0cHAuY29tMA0GCSqGSIb3DQEBCwUAA4ICAQBlTMPSwz46GMRBEPcy+25gV7xE
5aFS5N6sf3YQyFelRJgPxxPxTHo55WelcK4XmXRQKeQ4VoKf4FgP0Cj74+p0N0gw
wFJDWPGXH3SdjAXPRtG+FOiHwUSoyrmvbL4kk6Vbrd4cF+qe0BlzHzJ2Q6vFLwsk
NYvWzkY9YjoItB38nAnQhyYgl1yHUK/uDWyrwHVfZn1AeTws/hr/KufORuiQfaTU
kvAH1nzi7WSJ6AIQCd2exUEPx/O14Y+oCoJhTVd+RpA/9lkcqebceBijj47b2bvv
QbjymvyTXqHd3L224Y7zVmh95g+CaJ8PRpApdrImfjfDDRy8PaFWx2pd/v0UQgrQ
lgbO6jE7ah/tS0T5q5JtwnLAiOOqHPaKRvo5WB65jcZ2fvOH/0/oZ89noxp1Ihus
vvsjqc9k2h9Rvt2pEjVU40HtQZ6XCmWqgFwK3n9CHrKNV/GqgANIZRNcvXKMCUoB
VoJORVwi2DF4caKSFmyEWuK+5FyCEILtQ60SY/NHVGsUeOuN7OTjZjECARO6p4hz
Uw+GCIXrzmIjS6ydh/LRef+NK28+xTbjmLHu/wnHg9rrHEnTPd39is+byfS7eeLV
Dld/0Xrv88C0wxz63dcwAceiahjyz2mbQm765tOf9rK7EqsvT5M8EXFJ3dP4zwqS
6mNFoIa0XGbAUT3E1w==
-----END CERTIFICATE-----

View File

@@ -0,0 +1,21 @@
from typing import Optional
from pydantic import BaseModel, ConfigDict
class CategoryBase(BaseModel):
name: str
description: Optional[str] = None
class CategoryCreate(CategoryBase):
pass
class CategoryUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
class CategoryRead(CategoryBase):
id: int
model_config = ConfigDict(from_attributes=True)

View File

@@ -0,0 +1,26 @@
from typing import List, Optional, Union
from datetime import date
from pydantic import BaseModel, Field, ConfigDict
class TransactionBase(BaseModel):
amount: float = Field(..., gt=-1e18, lt=1e18)
description: Optional[str] = None
# accept either ISO date string or date object
date: Optional[Union[date, str]] = None
class TransactionCreate(TransactionBase):
category_ids: Optional[List[int]] = None
class TransactionUpdate(BaseModel):
amount: Optional[float] = Field(None, gt=-1e18, lt=1e18)
description: Optional[str] = None
# accept either ISO date string or date object
date: Optional[Union[date, str]] = None
category_ids: Optional[List[int]] = None
class TransactionRead(TransactionBase):
id: int
category_ids: List[int] = []
date: Optional[Union[date, str]]
model_config = ConfigDict(from_attributes=True)

View File

@@ -0,0 +1,17 @@
import uuid
from typing import Optional, Dict, Any
from fastapi_users import schemas
class UserRead(schemas.BaseUser[uuid.UUID]):
first_name: Optional[str] = None
last_name: Optional[str] = None
config: Optional[Dict[str, Any]] = None
class UserCreate(schemas.BaseUserCreate):
first_name: Optional[str] = None
last_name: Optional[str] = None
class UserUpdate(schemas.BaseUserUpdate):
first_name: Optional[str] = None
last_name: Optional[str] = None

View File

@@ -0,0 +1,178 @@
import json
import logging
import os
from os.path import dirname, join
from time import strptime
from uuid import UUID
import httpx
from sqlalchemy import select
from app.core.db import sync_session_maker
from app.models.transaction import Transaction
from app.models.user import User
logger = logging.getLogger(__name__)
OAUTH_DIR = join(dirname(__file__), "..", "oauth")
CERTS = (
join(OAUTH_DIR, "public_key.pem"),
join(OAUTH_DIR, "private_key.key"),
)
def load_mock_bank_transactions(user_id: str) -> None:
try:
uid = UUID(str(user_id))
except Exception:
logger.error("Invalid user_id provided to bank_scraper (sync): %r", user_id)
return
_load_mock_bank_transactions(uid)
def load_all_mock_bank_transactions() -> None:
with sync_session_maker() as session:
users = session.execute(select(User)).unique().scalars().all()
logger.info("[BankScraper] Starting Mock Bank scrape for all users | count=%d", len(users))
processed = 0
for user in users:
try:
_load_mock_bank_transactions(user.id)
processed += 1
except Exception:
logger.exception("[BankScraper] Error scraping for user id=%s email=%s", user.id,
getattr(user, 'email', None))
logger.info("[BankScraper] Finished Mock Bank scrape for all users | processed=%d", processed)
def _load_mock_bank_transactions(user_id: UUID) -> None:
with sync_session_maker() as session:
user: User | None = session.execute(select(User).where(User.id == user_id)).unique().scalar_one_or_none()
if user is None:
logger.warning("User not found for id=%s", user_id)
return
transactions = []
with httpx.Client() as client:
response = client.get(f"{os.getenv('APP_POD_URL')}/mock-bank/scrape")
if response.status_code != httpx.codes.OK:
return
for transaction in response.json():
transactions.append(
Transaction(
amount=transaction["amount"],
description=transaction.get("description"),
date=strptime(transaction["date"], "%Y-%m-%d"),
user_id=user_id,
)
)
for transaction in transactions:
session.add(transaction)
session.commit()
def load_ceska_sporitelna_transactions(user_id: str) -> None:
try:
uid = UUID(str(user_id))
except Exception:
logger.error("Invalid user_id provided to bank_scraper (sync): %r", user_id)
return
_load_ceska_sporitelna_transactions(uid)
def load_all_ceska_sporitelna_transactions() -> None:
with sync_session_maker() as session:
users = session.execute(select(User)).unique().scalars().all()
logger.info("[BankScraper] Starting CSAS scrape for all users | count=%d", len(users))
processed = 0
for user in users:
try:
_load_ceska_sporitelna_transactions(user.id)
processed += 1
except Exception:
logger.exception("[BankScraper] Error scraping for user id=%s email=%s", user.id,
getattr(user, 'email', None))
logger.info("[BankScraper] Finished CSAS scrape for all users | processed=%d", processed)
def _load_ceska_sporitelna_transactions(user_id: UUID) -> None:
with sync_session_maker() as session:
user: User | None = session.execute(select(User).where(User.id == user_id)).unique().scalar_one_or_none()
if user is None:
logger.warning("User not found for id=%s", user_id)
return
cfg = user.config or {}
if "csas" not in cfg:
return
cfg = json.loads(cfg["csas"])
if "access_token" not in cfg:
return
accounts = []
try:
with httpx.Client(cert=CERTS, timeout=httpx.Timeout(20.0)) as client:
response = client.get(
"https://webapi.developers.erstegroup.com/api/csas/sandbox/v4/account-information/my/accounts?size=10&page=0&sort=iban&order=desc",
headers={
"Authorization": f"Bearer {cfg['access_token']}",
"WEB-API-key": "09fdc637-3c57-4242-95f2-c2205a2438f3",
"user-involved": "false",
},
)
if response.status_code != httpx.codes.OK:
return
for account in response.json().get("accounts", []):
accounts.append(account)
except (httpx.HTTPError,) as e:
logger.exception("[BankScraper] HTTP error during CSAS request | user_id=%s", user_id)
return
for account in accounts:
acc_id = account.get("id")
if not acc_id:
continue
url = f"https://webapi.developers.erstegroup.com/api/csas/sandbox/v4/account-information/my/accounts/{acc_id}/transactions?size=100&page=0&sort=bookingdate&order=desc"
with httpx.Client(cert=CERTS) as client:
response = client.get(
url,
headers={
"Authorization": f"Bearer {cfg['access_token']}",
"WEB-API-key": "09fdc637-3c57-4242-95f2-c2205a2438f3",
"user-involved": "false",
},
)
if response.status_code != httpx.codes.OK:
continue
transactions = response.json().get("transactions", [])
for transaction in transactions:
description = transaction.get("entryDetails", {}).get("transactionDetails", {}).get(
"additionalRemittanceInformation")
date_str = transaction.get("bookingDate", {}).get("date")
date = strptime(date_str, "%Y-%m-%d") if date_str else None
amount = transaction.get("amount", {}).get("value")
if transaction.get("creditDebitIndicator") == "DBIT" and amount is not None:
amount = -abs(amount)
if amount is None:
continue
obj = Transaction(
amount=amount,
description=description,
date=date,
user_id=user_id,
)
session.add(obj)
session.commit()

View File

@@ -0,0 +1,16 @@
from typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi_users.db import SQLAlchemyUserDatabase
from ..core.db import async_session_maker
from ..models.user import User, OAuthAccount
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User, OAuthAccount)

View File

@@ -0,0 +1,48 @@
from typing import Callable
from prometheus_fastapi_instrumentator.metrics import Info
from prometheus_client import Gauge
from sqlalchemy import select, func
from app.core.db import async_session_maker
from app.models.transaction import Transaction
from app.models.user import User
def number_of_users() -> Callable[[Info], None]:
METRIC = Gauge(
"number_of_users_total",
"Number of registered users.",
labelnames=("users",)
)
async def instrumentation(info: Info) -> None:
try:
async with async_session_maker() as session:
result = await session.execute(select(func.count(User.id)))
user_count = result.scalar_one() or 0
except Exception:
# In case of DB errors, avoid crashing metrics endpoint
user_count = 0
METRIC.labels(users="total").set(user_count)
return instrumentation
def number_of_transactions() -> Callable[[Info], None]:
METRIC = Gauge(
"number_of_transactions_total",
"Number of transactions stored.",
labelnames=("transactions",)
)
async def instrumentation(info: Info) -> None:
try:
async with async_session_maker() as session:
result = await session.execute(select(func.count()).select_from(Transaction))
transaction_count = result.scalar_one() or 0
except Exception:
# In case of DB errors, avoid crashing metrics endpoint
transaction_count = 0
METRIC.labels(transactions="total").set(transaction_count)
return instrumentation

View File

@@ -0,0 +1,117 @@
import os
import uuid
from typing import Optional
from fastapi import Depends, Request
from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin, models
from fastapi_users.authentication import (
AuthenticationBackend,
BearerTransport,
)
from fastapi_users.authentication.strategy.jwt import JWTStrategy
from fastapi_users.db import SQLAlchemyUserDatabase
from httpx_oauth.oauth2 import BaseOAuth2
from app.models.user import User
from app.oauth.bank_id import BankID
from app.oauth.csas import CSASOAuth
from app.oauth.custom_openid import CustomOpenID
from app.oauth.moje_id import MojeIDOAuth
from app.services.db import get_user_db
from app.core.queue import enqueue_email
SECRET = os.getenv("SECRET", "CHANGE_ME_SECRET")
FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:5173")
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000")
providers = {
"MojeID": MojeIDOAuth(
os.getenv("MOJEID_CLIENT_ID", "CHANGE_ME_CLIENT_ID"),
os.getenv("MOJEID_CLIENT_SECRET", "CHANGE_ME_CLIENT_SECRET"),
),
"BankID": BankID(
os.getenv("BANKID_CLIENT_ID", "CHANGE_ME_CLIENT_ID"),
os.getenv("BANKID_CLIENT_SECRET", "CHANGE_ME_CLIENT_SECRET"),
),
}
def get_oauth_provider(name: str) -> Optional[BaseOAuth2]:
if name not in providers:
return None
return providers[name]
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
reset_password_token_secret = SECRET
verification_token_secret = SECRET
async def oauth_callback(self: "BaseUserManager[models.UOAP, models.ID]", oauth_name: str, access_token: str,
account_id: str, account_email: str, expires_at: Optional[int] = None,
refresh_token: Optional[str] = None, request: Optional[Request] = None, *,
associate_by_email: bool = False, is_verified_by_default: bool = False) -> models.UOAP:
user = await super().oauth_callback(oauth_name, access_token, account_id, account_email, expires_at,
refresh_token, request, associate_by_email=associate_by_email,
is_verified_by_default=is_verified_by_default)
# set additional user info from the OAuth provider
provider = get_oauth_provider(oauth_name)
if provider is not None and isinstance(provider, CustomOpenID):
update_dict = await provider.get_user_info(access_token)
await self.user_db.update(user, update_dict)
return user
async def on_after_register(self, user: User, request: Optional[Request] = None):
await self.request_verify(user, request)
async def on_after_forgot_password(
self, user: User, token: str, request: Optional[Request] = None
):
print(f"User {user.id} has forgot their password. Reset token: {token}")
async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None
):
verify_frontend_link = f"{FRONTEND_URL}/verify?token={token}"
verify_backend_link = f"{BACKEND_URL}/auth/verify?token={token}"
subject = "Ověření účtu"
body = (
"Ahoj,\n\n"
"děkujeme za registraci. Prosíme, ověř svůj účet kliknutím na tento odkaz:\n"
f"{verify_frontend_link}\n\n"
"Pokud by odkaz nefungoval, můžeš použít i přímý odkaz na backend:\n"
f"{verify_backend_link}\n\n"
"Pokud jsi registraci neprováděl(a), tento email ignoruj.\n"
)
try:
enqueue_email(to=user.email, subject=subject, body=body)
except Exception as e:
print("[Email Fallback] To:", user.email)
print("[Email Fallback] Subject:", subject)
print("[Email Fallback] Body:\n", body)
async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)):
yield UserManager(user_db)
bearer_transport = BearerTransport(tokenUrl="auth/jwt/login")
def get_jwt_strategy() -> JWTStrategy:
return JWTStrategy(secret=SECRET, lifetime_seconds=604800)
auth_backend = AuthenticationBackend(
name="jwt",
transport=bearer_transport,
get_strategy=get_jwt_strategy,
)
fastapi_users = FastAPIUsers[User, uuid.UUID](get_user_manager, [auth_backend])
current_active_user = fastapi_users.current_user(active=True)
current_active_verified_user = fastapi_users.current_user(active=True, verified=True)

View File

@@ -0,0 +1,86 @@
import logging
import os
import smtplib
from email.message import EmailMessage
import app.services.bank_scraper
from app.celery_app import celery_app
logger = logging.getLogger("celery_tasks")
if not logger.handlers:
_h = logging.StreamHandler()
logger.addHandler(_h)
logger.setLevel(logging.INFO)
@celery_app.task(name="workers.send_email")
def send_email(to: str, subject: str, body: str) -> None:
if not (to and subject and body):
logger.error("Email task missing fields. to=%r subject=%r body_len=%r", to, subject, len(body) if body else 0)
return
host = os.getenv("SMTP_HOST")
if not host:
logger.error("SMTP_HOST is not configured; cannot send email")
return
# Configuration
port = int(os.getenv("SMTP_PORT", "25"))
username = os.getenv("SMTP_USERNAME")
password = os.getenv("SMTP_PASSWORD")
use_tls = os.getenv("SMTP_USE_TLS", "0").lower() in {"1", "true", "yes"}
use_ssl = os.getenv("SMTP_USE_SSL", "0").lower() in {"1", "true", "yes"}
timeout = int(os.getenv("SMTP_TIMEOUT", "10"))
mail_from = os.getenv("SMTP_FROM") or username or "noreply@localhost"
# Build message
msg = EmailMessage()
msg["To"] = to
msg["From"] = mail_from
msg["Subject"] = subject
msg.set_content(body)
try:
if use_ssl:
with smtplib.SMTP_SSL(host=host, port=port, timeout=timeout) as smtp:
if username and password:
smtp.login(username, password)
smtp.send_message(msg)
else:
with smtplib.SMTP(host=host, port=port, timeout=timeout) as smtp:
# STARTTLS if requested
if use_tls:
smtp.starttls()
if username and password:
smtp.login(username, password)
smtp.send_message(msg)
logger.info("[Celery] Email sent | to=%s | subject=%s | body_len=%d", to, subject, len(body))
except Exception:
logger.exception("Failed to send email via SMTP to=%s subject=%s host=%s port=%s tls=%s ssl=%s", to, subject,
host, port, use_tls, use_ssl)
@celery_app.task(name="workers.load_transactions")
def load_transactions(user_id: str) -> None:
if not user_id:
logger.error("Load transactions task missing user_id.")
return
logger.info("[Celery] Starting load_transactions | user_id=%s", user_id)
try:
# Use synchronous bank scraper functions directly, mirroring load_all_transactions
app.services.bank_scraper.load_mock_bank_transactions(user_id)
app.services.bank_scraper.load_ceska_sporitelna_transactions(user_id)
except Exception:
logger.exception("Failed to load transactions for user_id=%s", user_id)
else:
logger.info("[Celery] Finished load_transactions | user_id=%s", user_id)
@celery_app.task(name="workers.load_all_transactions")
def load_all_transactions() -> None:
logger.info("[Celery] Starting load_all_transactions")
# Now use synchronous bank scraper functions directly
app.services.bank_scraper.load_all_mock_bank_transactions()
app.services.bank_scraper.load_all_ceska_sporitelna_transactions()
logger.info("[Celery] Finished load_all_transactions")

View File

@@ -0,0 +1,20 @@
version: "3.9"
services:
mariadb:
image: mariadb:11.4
container_name: test-mariadb
environment:
MARIADB_ROOT_PASSWORD: rootpw
MARIADB_DATABASE: group_project
MARIADB_USER: appuser
MARIADB_PASSWORD: apppass
ports:
- "3307:3306" # host:container (use 3307 on host to avoid conflicts)
healthcheck:
test: ["CMD", "mariadb-admin", "ping", "-h", "127.0.0.1", "-u", "root", "-prootpw", "--silent"]
interval: 5s
timeout: 2s
retries: 20
# Truly ephemeral, fast storage (removed when container stops)
tmpfs:
- /var/lib/mysql

View File

@@ -0,0 +1,4 @@
import uvicorn
if __name__ == "__main__":
uvicorn.run("app.app:app", host="0.0.0.0", log_level="info")

View File

@@ -0,0 +1,5 @@
[tool.pytest.ini_options]
pythonpath = "."
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "session"
asyncio_default_test_loop_scope = "session"

View File

@@ -0,0 +1,73 @@
aio-pika==9.5.6
aiormq==6.8.1
aiosqlite==0.21.0
alembic==1.16.5
amqp==5.3.1
annotated-types==0.7.0
anyio==4.11.0
argon2-cffi==23.1.0
argon2-cffi-bindings==25.1.0
asyncmy==0.2.9
bcrypt==4.3.0
billiard==4.2.2
celery==5.5.3
certifi==2025.10.5
cffi==2.0.0
click==8.1.8
click-didyoumean==0.3.1
click-plugins==1.1.1.2
click-repl==0.3.0
cryptography==46.0.1
dnspython==2.7.0
email_validator==2.2.0
exceptiongroup==1.3.0
fastapi==0.117.1
fastapi-users==14.0.1
fastapi-users-db-sqlalchemy==7.0.0
greenlet==3.2.4
h11==0.16.0
httpcore==1.0.9
httptools==0.6.4
httpx==0.28.1
httpx-oauth==0.16.1
idna==3.10
kombu==5.5.4
makefun==1.16.0
Mako==1.3.10
MarkupSafe==3.0.2
multidict==6.6.4
packaging==25.0
pamqp==3.3.0
prometheus-fastapi-instrumentator==7.1.0
prometheus_client==0.23.1
prompt_toolkit==3.0.52
propcache==0.3.2
pwdlib==0.2.1
pycparser==2.23
pydantic==2.11.9
pydantic_core==2.33.2
PyJWT==2.10.1
PyMySQL==1.1.2
python-dateutil==2.9.0.post0
python-dotenv==1.1.1
python-multipart==0.0.20
PyYAML==6.0.2
sentry-sdk==2.42.0
six==1.17.0
sniffio==1.3.1
SQLAlchemy==2.0.43
SQLAlchemy-Utils==0.42.0
starlette==0.48.0
tomli==2.2.1
typing-inspection==0.4.1
typing_extensions==4.15.0
tzdata==2025.2
urllib3==2.5.0
uvicorn==0.37.0
uvloop==0.21.0
vine==5.1.0
watchfiles==1.1.0
wcwidth==0.2.14
websockets==15.0.1
yarl==1.20.1
python-json-logger==2.0.7

View File

@@ -0,0 +1,113 @@
#!/usr/bin/env bash
set -euo pipefail
# Run tests against a disposable local MariaDB on host port 3307 using Docker Compose.
# Requirements: Docker, docker compose plugin, Python, Alembic, pytest.
# Usage:
# chmod +x ./test_locally.sh
# # From 7project/backend directory
# ./test_locally.sh [--only-unit|--only-integration|--only-e2e] [pytest-args...]
# # Examples:
# ./test_locally.sh --only-unit -q
# ./test_locally.sh --only-integration -k "login"
# ./test_locally.sh --only-e2e -vv
#
# This script will:
# 1) Start a MariaDB 11.4 container (ephemeral storage, port 3307)
# 2) Wait until it's healthy
# 3) Export env vars expected by the app (DATABASE_URL etc.)
# 4) Run Alembic migrations
# 5) Run pytest
# 6) Tear everything down (containers and tmpfs data)
COMPOSE_FILE="docker-compose.test.yml"
SERVICE_NAME="mariadb"
CONTAINER_NAME="test-mariadb"
if ! command -v docker >/dev/null 2>&1; then
echo "Docker is required but not found in PATH" >&2
exit 1
fi
if ! docker compose version >/dev/null 2>&1; then
echo "Docker Compose V2 plugin is required (docker compose)" >&2
exit 1
fi
# Bring up the DB
echo "Starting MariaDB (port 3307) with docker compose..."
docker compose -f "$COMPOSE_FILE" up -d
# Ensure we clean up on exit
cleanup() {
echo "\nTearing down docker compose stack..."
docker compose -f "$COMPOSE_FILE" down -v || true
}
trap cleanup EXIT
# Wait for healthy container
echo -n "Waiting for MariaDB to become healthy"
for i in {1..60}; do
status=$(docker inspect -f '{{.State.Health.Status}}' "$CONTAINER_NAME" 2>/dev/null || echo "")
if [ "$status" = "healthy" ]; then
echo " -> healthy"
break
fi
echo -n "."
sleep 1
if [ $i -eq 60 ]; then
echo "\nMariaDB did not become healthy in time" >&2
exit 1
fi
done
# Export env vars for the app/tests (match app/core/db.py expectations)
export MARIADB_HOST=127.0.0.1
export MARIADB_PORT=3307
export MARIADB_DB=group_project
export MARIADB_USER=appuser
export MARIADB_PASSWORD=apppass
export DATABASE_URL="mysql+asyncmy://$MARIADB_USER:$MARIADB_PASSWORD@$MARIADB_HOST:$MARIADB_PORT/$MARIADB_DB"
export PYTEST_RUN_CONFIG="True"
# Determine which tests to run based on flags
UNIT_TESTS="tests/test_unit_user_service.py"
INTEGRATION_TESTS="tests/test_integration_app.py"
E2E_TESTS="tests/test_e2e.py"
FLAG_COUNT=0
TEST_TARGET=""
declare -a PYTEST_ARGS=()
for arg in "$@"; do
case "$arg" in
--only-unit)
TEST_TARGET="$UNIT_TESTS"; FLAG_COUNT=$((FLAG_COUNT+1));;
--only-integration)
TEST_TARGET="$INTEGRATION_TESTS"; FLAG_COUNT=$((FLAG_COUNT+1));;
--only-e2e)
TEST_TARGET="$E2E_TESTS"; FLAG_COUNT=$((FLAG_COUNT+1));;
*)
PYTEST_ARGS+=("$arg");;
esac
done
if [ "$FLAG_COUNT" -gt 1 ]; then
echo "Error: Use only one of --only-unit, --only-integration, or --only-e2e" >&2
exit 2
fi
# Run Alembic migrations then tests
pushd . >/dev/null
echo "Running Alembic migrations..."
alembic upgrade head
echo "Running pytest..."
if [ -n "$TEST_TARGET" ]; then
# Use "${PYTEST_ARGS[@]:-}" to safely expand empty array with 'set -u'
pytest "$TEST_TARGET" "${PYTEST_ARGS[@]:-}"
else
# Use "${PYTEST_ARGS[@]:-}" to safely expand empty array with 'set -u'
pytest "${PYTEST_ARGS[@]:-}"
fi
popd >/dev/null
# Cleanup handled by trap

View File

@@ -0,0 +1,44 @@
import sys
import uuid
import types
import pytest
from fastapi.testclient import TestClient
from httpx import AsyncClient, ASGITransport
# Stub sentry_sdk to avoid optional dependency issues during import of app
stub = types.ModuleType("sentry_sdk")
stub.init = lambda *args, **kwargs: None
sys.modules.setdefault("sentry_sdk", stub)
# Import the FastAPI application
from app.app import fastApi as app # noqa: E402
@pytest.fixture(scope="session")
def fastapi_app():
return app
@pytest.fixture(scope="session")
def client(fastapi_app):
return TestClient(fastapi_app, raise_server_exceptions=True)
@pytest.fixture(scope="function")
async def test_user(fastapi_app):
"""
Creates a new user asynchronously and returns their credentials.
Does NOT log them in.
Using AsyncClient with ASGITransport avoids event loop conflicts with DB connections.
"""
unique_email = f"testuser_{uuid.uuid4()}@example.com"
password = "a_strong_password"
user_payload = {"email": unique_email, "password": password}
transport = ASGITransport(app=fastapi_app, raise_app_exceptions=True)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
response = await ac.post("/auth/register", json=user_payload)
assert response.status_code == 201
return {"username": unique_email, "password": password}

View File

@@ -0,0 +1,210 @@
import pytest
import uuid
from httpx import AsyncClient, ASGITransport
from fastapi import status
def test_e2e(client):
# 1) Service is alive
alive = client.get("/")
assert alive.status_code == status.HTTP_200_OK
# 2) Attempt to login without payload should fail fast (validation error)
login = client.post("/auth/jwt/login")
assert login.status_code in (status.HTTP_400_BAD_REQUEST, status.HTTP_422_UNPROCESSABLE_CONTENT)
# 3) Protected endpoint should not be accessible without token
me = client.get("/users/me")
assert me.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)
@pytest.mark.asyncio
async def test_e2e_full_user_lifecycle(fastapi_app, test_user):
# Use an AsyncClient with ASGITransport for async tests
transport = ASGITransport(app=fastapi_app, raise_app_exceptions=True)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
login_payload = test_user
# 1. Log in with the new credentials
login_resp = await ac.post("/auth/jwt/login", data=login_payload)
assert login_resp.status_code == status.HTTP_200_OK
token = login_resp.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# 2. Access a protected endpoint
me_resp = await ac.get("/users/me", headers=headers)
assert me_resp.status_code == status.HTTP_200_OK
assert me_resp.json()["email"] == test_user["username"]
# 3. Update the user's profile
update_payload = {"first_name": "Test"}
patch_resp = await ac.patch("/users/me", json=update_payload, headers=headers)
assert patch_resp.status_code == status.HTTP_200_OK
assert patch_resp.json()["first_name"] == "Test"
# 4. Log out
logout_resp = await ac.post("/auth/jwt/logout", headers=headers)
assert logout_resp.status_code in (status.HTTP_200_OK, status.HTTP_204_NO_CONTENT)
# 5. Verify token is invalid
me_again_resp = await ac.get("/users/me", headers=headers)
assert me_again_resp.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
async def test_e2e_transaction_workflow(fastapi_app, test_user):
transport = ASGITransport(app=fastapi_app, raise_app_exceptions=True)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
# 1. Log in to get the token
login_resp = await ac.post("/auth/jwt/login", data=test_user)
token = login_resp.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# NEW STEP: Create a category first to get a valid ID
category_payload = {"name": "Test Category for E2E"}
create_category_resp = await ac.post("/categories/create", json=category_payload, headers=headers)
assert create_category_resp.status_code == status.HTTP_201_CREATED
category_id = create_category_resp.json()["id"]
# 2. Create a new transaction
tx_payload = {"amount": -55.40, "description": "Milk and eggs"}
tx_resp = await ac.post("/transactions/create", json=tx_payload, headers=headers)
assert tx_resp.status_code == status.HTTP_201_CREATED
tx_id = tx_resp.json()["id"]
# 3. Assign the category
assign_resp = await ac.post(f"/transactions/{tx_id}/categories/{category_id}", headers=headers)
assert assign_resp.status_code == status.HTTP_200_OK
# 4. Verify assignment
get_tx_resp = await ac.get(f"/transactions/{tx_id}", headers=headers)
assert category_id in get_tx_resp.json()["category_ids"]
# 5. Unassign the category
unassign_resp = await ac.delete(f"/transactions/{tx_id}/categories/{category_id}", headers=headers)
assert unassign_resp.status_code == status.HTTP_200_OK
# 6. Get the transaction again and verify the category is gone
get_tx_again_resp = await ac.get(f"/transactions/{tx_id}", headers=headers)
final_tx_data = get_tx_again_resp.json()
assert category_id not in final_tx_data["category_ids"]
# 7. Delete the transaction for cleanup
delete_resp = await ac.delete(f"/transactions/{tx_id}/delete", headers=headers)
assert delete_resp.status_code in (status.HTTP_200_OK, status.HTTP_204_NO_CONTENT)
# NEW STEP: Clean up the created category
delete_category_resp = await ac.delete(f"/categories/{category_id}", headers=headers)
assert delete_category_resp.status_code in (status.HTTP_200_OK, status.HTTP_204_NO_CONTENT)
@pytest.mark.asyncio
async def test_register_then_login_and_fetch_me(fastapi_app):
transport = ASGITransport(app=fastapi_app, raise_app_exceptions=True)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
# Use unique email to avoid duplicates across runs
suffix = uuid.uuid4().hex[:8]
email = f"newuser_{suffix}@example.com"
password = "StrongPassw0rd!"
reg = await ac.post("/auth/register", json={"email": email, "password": password})
assert reg.status_code in (status.HTTP_201_CREATED, status.HTTP_200_OK)
login = await ac.post("/auth/jwt/login", data={"username": email, "password": password})
assert login.status_code == status.HTTP_200_OK
token = login.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
try:
me = await ac.get("/users/me", headers=headers)
assert me.status_code == status.HTTP_200_OK
assert me.json()["email"] == email
finally:
# Cleanup: delete the created user so future runs wont conflict
d = await ac.delete("/users/me", headers=headers)
assert d.status_code == status.HTTP_204_NO_CONTENT
@pytest.mark.asyncio
async def test_delete_current_user_revokes_access(fastapi_app):
transport = ASGITransport(app=fastapi_app, raise_app_exceptions=True)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
email = "todelete@example.com"
password = "Passw0rd!"
reg = await ac.post("/auth/register", json={"email": email, "password": password})
assert reg.status_code in (status.HTTP_200_OK, status.HTTP_201_CREATED)
login = await ac.post("/auth/jwt/login", data={"username": email, "password": password})
token = login.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# Delete self
d = await ac.delete("/users/me", headers=headers)
assert d.status_code == status.HTTP_204_NO_CONTENT
# Access should now fail
me = await ac.get("/users/me", headers=headers)
assert me.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)
@pytest.mark.asyncio
async def test_update_category_conflict_and_404(fastapi_app, test_user):
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
token = (await ac.post("/auth/jwt/login", data=test_user)).json()["access_token"]
h = {"Authorization": f"Bearer {token}"}
a = (await ac.post("/categories/create", json={"name": "A"}, headers=h)).json()
b = (await ac.post("/categories/create", json={"name": "B"}, headers=h)).json()
# Attempt to rename A -> B should conflict
conflict = await ac.patch(f"/categories/{a['id']}", json={"name": "B"}, headers=h)
assert conflict.status_code == status.HTTP_409_CONFLICT
# Update non-existent
missing = await ac.patch("/categories/999999", json={"name": "Z"}, headers=h)
assert missing.status_code == status.HTTP_404_NOT_FOUND
@pytest.mark.asyncio
async def test_category_cross_user_isolation(fastapi_app):
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
# Generate unique emails for both users
sfx = uuid.uuid4().hex[:8]
u1 = {"email": f"u1_{sfx}@example.com", "password": "Aaaaaa1!"}
u2 = {"email": f"u2_{sfx}@example.com", "password": "Aaaaaa1!"}
# user1
assert (await ac.post("/auth/register", json=u1)).status_code in (200, 201)
t1 = (await ac.post("/auth/jwt/login", data={"username": u1["email"], "password": u1["password"]})).json()["access_token"]
h1 = {"Authorization": f"Bearer {t1}"}
# user1 creates a category
c = (await ac.post("/categories/create", json={"name": "Private"}, headers=h1)).json()
cat_id = c["id"]
# user2
assert (await ac.post("/auth/register", json=u2)).status_code in (200, 201)
t2 = (await ac.post("/auth/jwt/login", data={"username": u2["email"], "password": u2["password"]})).json()["access_token"]
h2 = {"Authorization": f"Bearer {t2}"}
try:
# user2 cannot read/delete user1's category
g = await ac.get(f"/categories/{cat_id}", headers=h2)
assert g.status_code == status.HTTP_404_NOT_FOUND
d = await ac.delete(f"/categories/{cat_id}", headers=h2)
assert d.status_code == status.HTTP_404_NOT_FOUND
finally:
# Cleanup: remove the created category as its owner
try:
_ = await ac.delete(f"/categories/{cat_id}", headers=h1)
except Exception:
pass
# Cleanup: delete both users to avoid email conflicts later
try:
_ = await ac.delete("/users/me", headers=h1)
except Exception:
pass
try:
_ = await ac.delete("/users/me", headers=h2)
except Exception:
pass

View File

@@ -0,0 +1,159 @@
from fastapi import status
import pytest
from httpx import AsyncClient, ASGITransport
@pytest.mark.asyncio
async def test_create_and_get_category(fastapi_app, test_user):
# Use AsyncClient for async tests
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
# 1. Log in to get an auth token
login_resp = await ac.post("/auth/jwt/login", data=test_user)
token = login_resp.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# 2. Define and create the new category
category_name = "Async Integration Test"
category_payload = {"name": category_name}
create_resp = await ac.post("/categories/create", json=category_payload, headers=headers)
# 3. Assert creation was successful
assert create_resp.status_code == status.HTTP_201_CREATED
created_data = create_resp.json()
category_id = created_data["id"]
assert created_data["name"] == category_name
# 4. GET the list of categories to verify
list_resp = await ac.get("/categories/", headers=headers)
assert list_resp.status_code == status.HTTP_200_OK
# 5. Check that our new category is in the list
categories_list = list_resp.json()
assert any(cat["name"] == category_name for cat in categories_list)
delete_resp = await ac.delete(f"/categories/{category_id}", headers=headers)
assert delete_resp.status_code in (status.HTTP_200_OK, status.HTTP_204_NO_CONTENT)
@pytest.mark.asyncio
async def test_create_transaction_missing_amount_fails(fastapi_app, test_user):
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
# 1. Log in to get an auth token
login_resp = await ac.post("/auth/jwt/login", data=test_user)
token = login_resp.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# 2. Define an invalid payload
invalid_payload = {"description": "This should fail"}
# 3. Attempt to create the transaction
resp = await ac.post("/transactions/create", json=invalid_payload, headers=headers)
# 4. Assert the expected validation error
assert resp.status_code == status.HTTP_422_UNPROCESSABLE_CONTENT
@pytest.mark.asyncio
async def test_login_invalid_credentials(fastapi_app, test_user):
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
bad = await ac.post("/auth/jwt/login", data={"username": test_user["username"], "password": "nope"})
assert bad.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_400_BAD_REQUEST)
unknown = await ac.post("/auth/jwt/login", data={"username": "nouser@example.com", "password": "x"})
assert unknown.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_400_BAD_REQUEST)
@pytest.mark.asyncio
async def test_category_duplicate_name_conflict(fastapi_app, test_user):
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
token = (await ac.post("/auth/jwt/login", data=test_user)).json()["access_token"]
h = {"Authorization": f"Bearer {token}"}
p = {"name": "Food"}
r1 = await ac.post("/categories/create", json=p, headers=h)
assert r1.status_code == status.HTTP_201_CREATED
r2 = await ac.post("/categories/create", json=p, headers=h)
assert r2.status_code == status.HTTP_409_CONFLICT
@pytest.mark.asyncio
async def test_create_transaction_invalid_date_format(fastapi_app, test_user):
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
token = (await ac.post("/auth/jwt/login", data=test_user)).json()["access_token"]
h = {"Authorization": f"Bearer {token}"}
bad = await ac.post("/transactions/create", json={"amount": 10, "description": "x", "date": "31-12-2024"}, headers=h)
assert bad.status_code == status.HTTP_400_BAD_REQUEST
@pytest.mark.asyncio
async def test_update_transaction_rejects_duplicate_category_ids(fastapi_app, test_user):
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
token = (await ac.post("/auth/jwt/login", data=test_user)).json()["access_token"]
h = {"Authorization": f"Bearer {token}"}
tx = (await ac.post("/transactions/create", json={"amount": 5, "description": "x"}, headers=h)).json()
dup = await ac.patch(f"/transactions/{tx['id']}/edit", json={"category_ids": [1, 1]}, headers=h)
assert dup.status_code == status.HTTP_400_BAD_REQUEST
@pytest.mark.asyncio
async def test_assign_unassign_category_not_found_cases(fastapi_app, test_user):
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
token = (await ac.post("/auth/jwt/login", data=test_user)).json()["access_token"]
h = {"Authorization": f"Bearer {token}"}
# Create tx and category
tx = (await ac.post("/transactions/create", json={"amount": 1, "description": "a"}, headers=h)).json()
cat = (await ac.post("/categories/create", json={"name": "X"}, headers=h)).json()
# Missing transaction
r1 = await ac.post(f"/transactions/999999/categories/{cat['id']}", headers=h)
assert r1.status_code == status.HTTP_404_NOT_FOUND
# Missing category
r2 = await ac.post(f"/transactions/{tx['id']}/categories/999999", headers=h)
assert r2.status_code == status.HTTP_404_NOT_FOUND
@pytest.mark.asyncio
async def test_transactions_date_filter_and_balance_series(fastapi_app, test_user):
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
token = (await ac.post("/auth/jwt/login", data=test_user)).json()["access_token"]
h = {"Authorization": f"Bearer {token}"}
# Seed transactions spanning days
data = [
{"amount": 100, "description": "day1", "date": "2024-01-01"},
{"amount": -25, "description": "day2", "date": "2024-01-02"},
{"amount": 50, "description": "day3", "date": "2024-01-03"},
]
for p in data:
r = await ac.post("/transactions/create", json=p, headers=h)
assert r.status_code == status.HTTP_201_CREATED
# Filtered list (2nd and 3rd only)
lst = await ac.get("/transactions/", params={"start_date": "2024-01-02", "end_date": "2024-01-03"}, headers=h)
assert lst.status_code == status.HTTP_200_OK
assert len(lst.json()) == 2
# Balance series should be cumulative per date
series = await ac.get("/transactions/balance_series", headers=h)
assert series.status_code == status.HTTP_200_OK
s = series.json()
assert s == [
{"date": "2024-01-01", "balance": 100.0},
{"date": "2024-01-02", "balance": 75.0},
{"date": "2024-01-03", "balance": 125.0},
]
@pytest.mark.asyncio
async def test_delete_transaction_not_found(fastapi_app, test_user):
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
token = (await ac.post("/auth/jwt/login", data=test_user)).json()["access_token"]
h = {"Authorization": f"Bearer {token}"}
r = await ac.delete("/transactions/9999999/delete", headers=h)
assert r.status_code == status.HTTP_404_NOT_FOUND

View File

@@ -0,0 +1,62 @@
import pytest
from fastapi import status
from app.services import user_service
def test_get_oauth_provider_known_unknown():
# Known providers should return a provider instance
bankid = user_service.get_oauth_provider("BankID")
mojeid = user_service.get_oauth_provider("MojeID")
assert bankid is not None
assert mojeid is not None
# Unknown should return None
assert user_service.get_oauth_provider("DoesNotExist") is None
def test_get_jwt_strategy_lifetime():
strategy = user_service.get_jwt_strategy()
assert strategy is not None
# Basic smoke check: strategy has a lifetime set to 604800
assert getattr(strategy, "lifetime_seconds", None) in (604800,)
def test_root_ok(client):
resp = client.get("/")
assert resp.status_code == status.HTTP_200_OK
assert resp.json() == {"status": "ok"}
def test_authenticated_route_requires_auth(client):
resp = client.get("/authenticated-route")
assert resp.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)
@pytest.mark.asyncio
async def test_on_after_request_verify_enqueues_email(monkeypatch):
calls = {}
def fake_enqueue_email(to: str, subject: str, body: str):
calls.setdefault("emails", []).append({
"to": to,
"subject": subject,
"body": body,
})
# Patch the enqueue_email used inside user_service
monkeypatch.setattr(user_service, "enqueue_email", fake_enqueue_email)
class DummyUser:
def __init__(self, email):
self.email = email
mgr = user_service.UserManager(user_db=None) # user_db not needed for this method
user = DummyUser("test@example.com")
# Call the hook
await mgr.on_after_request_verify(user, token="abc123", request=None)
# Verify one email has been enqueued with expected content
assert len(calls.get("emails", [])) == 1
email = calls["emails"][0]
assert email["to"] == "test@example.com"
assert "ověření účtu" in email["subject"].lower()
assert "abc123" in email["body"]