Compare commits

1 Commits

Author SHA1 Message Date
d6a913a896 feat(worker): add transaction saving to db 2025-10-29 18:11:53 +01:00
14 changed files with 91 additions and 301 deletions

View File

@@ -12,7 +12,25 @@ jobs:
test: test:
name: Run Python Tests name: Run Python Tests
if: github.event.action != 'closed' if: github.event.action != 'closed'
uses: ./.github/workflows/run-tests.yml runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run tests with pytest
run: pytest
working-directory: ./7project/backend
build: build:
if: github.event.action != 'closed' if: github.event.action != 'closed'

View File

@@ -23,7 +23,26 @@ concurrency:
jobs: jobs:
test: test:
name: Run Python Tests name: Run Python Tests
uses: ./.github/workflows/run-tests.yml if: github.event.action != 'closed'
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run tests with pytest
run: pytest
working-directory: ./7project/backend
build: build:
name: Build and push image (reusable) name: Build and push image (reusable)

View File

@@ -2,45 +2,47 @@ name: Run Python Tests
permissions: permissions:
contents: read contents: read
# -----------------
# --- Triggers ----
# -----------------
# This section defines when the workflow will run.
on: on:
workflow_call: # Run on every push to the 'main' branch
push:
branches: [ "main", "30-create-tests-and-set-up-a-github-pipeline" ]
# Also run on every pull request that targets the 'main' branch
pull_request:
branches: [ "main" ]
# -----------------
# ------ Jobs -----
# -----------------
# A workflow is made up of one or more jobs that can run in parallel or sequentially.
jobs: jobs:
# A descriptive name for your job
build-and-test: build-and-test:
# Specifies the virtual machine to run the job on. 'ubuntu-latest' is a common and cost-effective choice.
runs-on: ubuntu-latest runs-on: ubuntu-latest
services: # -----------------
mariadb: # ----- Steps -----
image: mariadb:11.4 # -----------------
env: # A sequence of tasks that will be executed as part of the job.
MARIADB_ROOT_PASSWORD: rootpw
MARIADB_DATABASE: group_project
MARIADB_USER: appuser
MARIADB_PASSWORD: apppass
ports:
- 3306:3306
options: >-
--health-cmd="mariadb-admin ping -h 127.0.0.1 -u root -prootpw --silent"
--health-interval=5s
--health-timeout=2s
--health-retries=20
env:
MARIADB_HOST: 127.0.0.1
MARIADB_PORT: "3306"
MARIADB_DB: group_project
MARIADB_USER: appuser
MARIADB_PASSWORD: apppass
steps: steps:
# Step 1: Check out your repository's code
# This action allows the workflow to access your code.
- name: Check out repository code - name: Check out repository code
uses: actions/checkout@v4 uses: actions/checkout@v4
# Step 2: Set up the Python environment
# This action installs a specific version of Python on the runner.
- name: Set up Python 3.11 - name: Set up Python 3.11
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:
python-version: '3.11' python-version: '3.11' # Use the Python version that matches your project
# Step 3: Install project dependencies
# Runs shell commands to install the libraries listed in your requirements.txt.
- name: Add test dependencies to requirements - name: Add test dependencies to requirements
run: | run: |
echo "pytest==8.4.2" >> ./7project/backend/requirements.txt echo "pytest==8.4.2" >> ./7project/backend/requirements.txt
@@ -51,11 +53,8 @@ jobs:
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install -r ./7project/backend/requirements.txt pip install -r ./7project/backend/requirements.txt
- name: Run Alembic migrations # Step 4: Run your tests!
run: | # Executes the pytest command to run your test suite.
alembic upgrade head
working-directory: ./7project/backend
- name: Run tests with pytest - name: Run tests with pytest
run: pytest run: pytest
working-directory: ./7project/backend working-directory: ./7project/backend

View File

@@ -25,8 +25,7 @@ if not DATABASE_URL:
SYNC_DATABASE_URL = DATABASE_URL.replace("+asyncmy", "+pymysql") SYNC_DATABASE_URL = DATABASE_URL.replace("+asyncmy", "+pymysql")
host_env = os.getenv("MARIADB_HOST", "localhost") ssl_enabled = os.getenv("MARIADB_HOST", "localhost") != "localhost"
ssl_enabled = host_env not in {"localhost", "127.0.0.1"}
connect_args = {"ssl": {"ssl": True}} if ssl_enabled else {} connect_args = {"ssl": {"ssl": True}} if ssl_enabled else {}
def run_migrations_offline() -> None: def run_migrations_offline() -> None:

View File

@@ -24,23 +24,6 @@ async def delete_me(
await user_manager.delete(user) await user_manager.delete(user)
# Keep existing paths as-is under /auth/* and /users/* # Keep existing paths as-is under /auth/* and /users/*
from fastapi import Request, Response
from app.core.security import revoke_token, extract_bearer_token
@router.post(
"/auth/jwt/logout",
status_code=status.HTTP_204_NO_CONTENT,
tags=["auth"],
summary="Log out and revoke current token",
)
async def custom_logout(request: Request) -> Response:
"""Revoke the current bearer token so it cannot be used anymore."""
token = extract_bearer_token(request)
if token:
revoke_token(token)
return Response(status_code=status.HTTP_204_NO_CONTENT)
router.include_router( router.include_router(
fastapi_users.get_auth_router(auth_backend), prefix="/auth/jwt", tags=["auth"] fastapi_users.get_auth_router(auth_backend), prefix="/auth/jwt", tags=["auth"]
) )

View File

@@ -16,8 +16,6 @@ from app.api.csas import router as csas_router
from app.api.categories import router as categories_router from app.api.categories import router as categories_router
from app.api.transactions import router as transactions_router from app.api.transactions import router as transactions_router
from app.services.user_service import auth_backend, current_active_verified_user, fastapi_users, get_oauth_provider, UserManager, get_jwt_strategy from app.services.user_service import auth_backend, current_active_verified_user, fastapi_users, get_oauth_provider, UserManager, get_jwt_strategy
from app.core.security import extract_bearer_token, is_token_revoked, decode_and_verify_jwt
from app.services.user_service import SECRET
from fastapi import FastAPI from fastapi import FastAPI
@@ -51,23 +49,6 @@ fastApi.include_router(categories_router)
fastApi.include_router(transactions_router) fastApi.include_router(transactions_router)
logging.basicConfig(filename='app.log', level=logging.INFO, format='%(asctime)s %(message)s') logging.basicConfig(filename='app.log', level=logging.INFO, format='%(asctime)s %(message)s')
@fastApi.middleware("http")
async def auth_guard(request: Request, call_next):
# Enforce revoked/expired JWTs are rejected globally
token = extract_bearer_token(request)
if token:
from fastapi import Response, status as _status
# Deny if token is revoked
if is_token_revoked(token):
return Response(status_code=_status.HTTP_401_UNAUTHORIZED)
# Deny if token is expired or invalid
try:
decode_and_verify_jwt(token, SECRET)
except Exception:
return Response(status_code=_status.HTTP_401_UNAUTHORIZED)
return await call_next(request)
@fastApi.middleware("http") @fastApi.middleware("http")
async def log_traffic(request: Request, call_next): async def log_traffic(request: Request, call_next):
start_time = datetime.now() start_time = datetime.now()

View File

@@ -19,8 +19,7 @@ from app.models.user import User
from app.models.transaction import Transaction from app.models.transaction import Transaction
from app.models.categories import Category from app.models.categories import Category
host_env = os.getenv("MARIADB_HOST", "localhost") ssl_enabled = os.getenv("MARIADB_HOST", "localhost") != "localhost"
ssl_enabled = host_env not in {"localhost", "127.0.0.1"}
connect_args = {"ssl": {"ssl": True}} if ssl_enabled else {} connect_args = {"ssl": {"ssl": True}} if ssl_enabled else {}
engine = create_async_engine( engine = create_async_engine(

View File

@@ -1,52 +0,0 @@
from typing import Optional
import re
import jwt
from fastapi import Request
# Simple in-memory revocation store for revoked JWT tokens.
#
# Limitations:
# - All revoked tokens will be lost if the process restarts (data loss on restart).
# - Not suitable for multi-instance deployments: the revocation list is not shared between instances.
# A token revoked in one instance will not be recognized as revoked in others.
#
# For production, use a persistent and shared store (e.g., Redis or a database).
_REVOKED_TOKENS: set[str] = set()
# Bearer token regex
_BEARER_RE = re.compile(r"^[Bb]earer\s+(.+)$")
def extract_bearer_token(request: Request) -> Optional[str]:
auth = request.headers.get("authorization")
if not auth:
return None
m = _BEARER_RE.match(auth)
if not m:
return None
return m.group(1).strip()
def revoke_token(token: str) -> None:
if token:
_REVOKED_TOKENS.add(token)
def is_token_revoked(token: str) -> bool:
return token in _REVOKED_TOKENS
def decode_and_verify_jwt(token: str, secret: str) -> dict:
"""
Decode the JWT using the shared secret, verifying expiration and signature.
Audience is not verified here to be compatible with fastapi-users default tokens.
Raises jwt.ExpiredSignatureError if expired.
Raises jwt.InvalidTokenError for other issues.
Returns the decoded payload dict on success.
"""
return jwt.decode(
token,
secret,
algorithms=["HS256"],
options={"verify_aud": False},
) # verify_exp is True by default

View File

@@ -1,17 +1,18 @@
import json import json
import logging import logging
from os.path import dirname, join from os.path import dirname, join
from time import strptime
from uuid import UUID from uuid import UUID
import httpx import httpx
from sqlalchemy import select from sqlalchemy import select
from app.core.db import async_session_maker from app.core.db import async_session_maker
from app.models.transaction import Transaction
from app.models.user import User from app.models.user import User
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Reuse CSAS mTLS certs used by OAuth profile calls
OAUTH_DIR = join(dirname(__file__), "..", "oauth") OAUTH_DIR = join(dirname(__file__), "..", "oauth")
CERTS = ( CERTS = (
join(OAUTH_DIR, "public_key.pem"), join(OAUTH_DIR, "public_key.pem"),
@@ -20,10 +21,6 @@ CERTS = (
async def aload_ceska_sporitelna_transactions(user_id: str) -> None: async def aload_ceska_sporitelna_transactions(user_id: str) -> None:
"""
Async entry point to load Česká spořitelna transactions for a single user.
Validates the user_id and performs a minimal placeholder action.
"""
try: try:
uid = UUID(str(user_id)) uid = UUID(str(user_id))
except Exception: except Exception:
@@ -34,9 +31,6 @@ async def aload_ceska_sporitelna_transactions(user_id: str) -> None:
async def aload_all_ceska_sporitelna_transactions() -> None: async def aload_all_ceska_sporitelna_transactions() -> None:
"""
Async entry point to load Česká spořitelna transactions for all users.
"""
async with async_session_maker() as session: async with async_session_maker() as session:
result = await session.execute(select(User)) result = await session.execute(select(User))
users = result.unique().scalars().all() users = result.unique().scalars().all()
@@ -54,7 +48,7 @@ async def aload_all_ceska_sporitelna_transactions() -> None:
async def _aload_ceska_sporitelna_transactions(user_id: UUID) -> None: async def _aload_ceska_sporitelna_transactions(user_id: UUID) -> None:
async with async_session_maker() as session: async with (async_session_maker() as session):
result = await session.execute(select(User).where(User.id == user_id)) result = await session.execute(select(User).where(User.id == user_id))
user: User = result.unique().scalar_one_or_none() user: User = result.unique().scalar_one_or_none()
if user is None: if user is None:
@@ -106,16 +100,22 @@ async def _aload_ceska_sporitelna_transactions(user_id: UUID) -> None:
if response.status_code != httpx.codes.OK: if response.status_code != httpx.codes.OK:
continue continue
# Placeholder: just print the account transactions
transactions = response.json()["transactions"] transactions = response.json()["transactions"]
pass
for transaction in transactions: for transaction in transactions:
#parse and store transaction to database description = transaction.get("entryDetails", {}).get("transactionDetails", {}).get(
#create Transaction object and save to DB "additionalRemittanceInformation")
#obj = date_str = transaction.get("bookingDate", {}).get("date")
date = strptime(date_str, "%Y-%m-%d") if date_str else None
obj = Transaction(
amount=transaction['amount']['value'],
description=description,
date=date,
user_id=user_id,
)
session.add(obj)
await session.commit()
pass pass
pass pass

View File

@@ -1,5 +1,2 @@
[tool.pytest.ini_options] [tool.pytest.ini_options]
pythonpath = "." pythonpath = "."
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "session"
asyncio_default_test_loop_scope = "session"

View File

@@ -1,9 +1,7 @@
import sys import sys
import uuid
import types import types
import pytest import pytest
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from httpx import AsyncClient, ASGITransport
# Stub sentry_sdk to avoid optional dependency issues during import of app # Stub sentry_sdk to avoid optional dependency issues during import of app
stub = types.ModuleType("sentry_sdk") stub = types.ModuleType("sentry_sdk")
@@ -22,23 +20,3 @@ def fastapi_app():
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def client(fastapi_app): def client(fastapi_app):
return TestClient(fastapi_app, raise_server_exceptions=True) return TestClient(fastapi_app, raise_server_exceptions=True)
@pytest.fixture(scope="function")
async def test_user(fastapi_app):
"""
Creates a new user asynchronously and returns their credentials.
Does NOT log them in.
Using AsyncClient with ASGITransport avoids event loop conflicts with DB connections.
"""
unique_email = f"testuser_{uuid.uuid4()}@example.com"
password = "a_strong_password"
user_payload = {"email": unique_email, "password": password}
transport = ASGITransport(app=fastapi_app, raise_app_exceptions=True)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
response = await ac.post("/auth/register", json=user_payload)
assert response.status_code == 201
return {"username": unique_email, "password": password}

View File

@@ -1,6 +1,3 @@
import pytest
import uuid
from httpx import AsyncClient, ASGITransport
from fastapi import status from fastapi import status
@@ -16,83 +13,3 @@ def test_e2e_minimal_auth_flow(client):
# 3) Protected endpoint should not be accessible without token # 3) Protected endpoint should not be accessible without token
me = client.get("/users/me") me = client.get("/users/me")
assert me.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN) assert me.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)
@pytest.mark.asyncio
async def test_e2e_full_user_lifecycle(fastapi_app, test_user):
# Use an AsyncClient with ASGITransport for async tests
transport = ASGITransport(app=fastapi_app, raise_app_exceptions=True)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
login_payload = test_user
# 1. Log in with the new credentials
login_resp = await ac.post("/auth/jwt/login", data=login_payload)
assert login_resp.status_code == status.HTTP_200_OK
token = login_resp.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# 2. Access a protected endpoint
me_resp = await ac.get("/users/me", headers=headers)
assert me_resp.status_code == status.HTTP_200_OK
assert me_resp.json()["email"] == test_user["username"]
# 3. Update the user's profile
update_payload = {"first_name": "Test"}
patch_resp = await ac.patch("/users/me", json=update_payload, headers=headers)
assert patch_resp.status_code == status.HTTP_200_OK
assert patch_resp.json()["first_name"] == "Test"
# 4. Log out
logout_resp = await ac.post("/auth/jwt/logout", headers=headers)
assert logout_resp.status_code in (status.HTTP_200_OK, status.HTTP_204_NO_CONTENT)
# 5. Verify token is invalid
me_again_resp = await ac.get("/users/me", headers=headers)
assert me_again_resp.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
async def test_e2e_transaction_workflow(fastapi_app, test_user):
transport = ASGITransport(app=fastapi_app, raise_app_exceptions=True)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
# 1. Log in to get the token
login_resp = await ac.post("/auth/jwt/login", data=test_user)
token = login_resp.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# NEW STEP: Create a category first to get a valid ID
category_payload = {"name": "Test Category for E2E"}
create_category_resp = await ac.post("/categories/create", json=category_payload, headers=headers)
assert create_category_resp.status_code == status.HTTP_201_CREATED
category_id = create_category_resp.json()["id"]
# 2. Create a new transaction
tx_payload = {"amount": -55.40, "description": "Milk and eggs"}
tx_resp = await ac.post("/transactions/create", json=tx_payload, headers=headers)
assert tx_resp.status_code == status.HTTP_201_CREATED
tx_id = tx_resp.json()["id"]
# 3. Assign the category
assign_resp = await ac.post(f"/transactions/{tx_id}/categories/{category_id}", headers=headers)
assert assign_resp.status_code == status.HTTP_200_OK
# 4. Verify assignment
get_tx_resp = await ac.get(f"/transactions/{tx_id}", headers=headers)
assert category_id in get_tx_resp.json()["category_ids"]
# 5. Unassign the category
unassign_resp = await ac.delete(f"/transactions/{tx_id}/categories/{category_id}", headers=headers)
assert unassign_resp.status_code == status.HTTP_200_OK
# 6. Get the transaction again and verify the category is gone
get_tx_again_resp = await ac.get(f"/transactions/{tx_id}", headers=headers)
final_tx_data = get_tx_again_resp.json()
assert category_id not in final_tx_data["category_ids"]
# 7. Delete the transaction for cleanup
delete_resp = await ac.delete(f"/transactions/{tx_id}/delete", headers=headers)
assert delete_resp.status_code in (status.HTTP_200_OK, status.HTTP_204_NO_CONTENT)
# NEW STEP: Clean up the created category
delete_category_resp = await ac.delete(f"/categories/{category_id}", headers=headers)
assert delete_category_resp.status_code in (status.HTTP_200_OK, status.HTTP_204_NO_CONTENT)

View File

@@ -1,6 +1,5 @@
from fastapi import status from fastapi import status
import pytest import pytest
from httpx import AsyncClient, ASGITransport
def test_root_ok(client): def test_root_ok(client):
@@ -14,53 +13,6 @@ def test_authenticated_route_requires_auth(client):
assert resp.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN) assert resp.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)
@pytest.mark.asyncio def test_sentry_debug_raises_exception(client):
async def test_create_and_get_category(fastapi_app, test_user): with pytest.raises(ZeroDivisionError):
# Use AsyncClient for async tests client.get("/sentry-debug")
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
# 1. Log in to get an auth token
login_resp = await ac.post("/auth/jwt/login", data=test_user)
token = login_resp.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# 2. Define and create the new category
category_name = "Async Integration Test"
category_payload = {"name": category_name}
create_resp = await ac.post("/categories/create", json=category_payload, headers=headers)
# 3. Assert creation was successful
assert create_resp.status_code == status.HTTP_201_CREATED
created_data = create_resp.json()
category_id = created_data["id"]
assert created_data["name"] == category_name
# 4. GET the list of categories to verify
list_resp = await ac.get("/categories/", headers=headers)
assert list_resp.status_code == status.HTTP_200_OK
# 5. Check that our new category is in the list
categories_list = list_resp.json()
assert any(cat["name"] == category_name for cat in categories_list)
delete_resp = await ac.delete(f"/categories/{category_id}", headers=headers)
assert delete_resp.status_code in (status.HTTP_200_OK, status.HTTP_204_NO_CONTENT)
@pytest.mark.asyncio
async def test_create_transaction_missing_amount_fails(fastapi_app, test_user):
transport = ASGITransport(app=fastapi_app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
# 1. Log in to get an auth token
login_resp = await ac.post("/auth/jwt/login", data=test_user)
token = login_resp.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# 2. Define an invalid payload
invalid_payload = {"description": "This should fail"}
# 3. Attempt to create the transaction
resp = await ac.post("/transactions/create", json=invalid_payload, headers=headers)
# 4. Assert the expected validation error
assert resp.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY

View File

@@ -19,7 +19,7 @@ def test_get_oauth_provider_known_unknown():
def test_get_jwt_strategy_lifetime(): def test_get_jwt_strategy_lifetime():
strategy = user_service.get_jwt_strategy() strategy = user_service.get_jwt_strategy()
assert strategy is not None assert strategy is not None
# Basic smoke check: strategy has a lifetime set to 604800 # Basic smoke check: strategy has a lifetime set to 3600
assert getattr(strategy, "lifetime_seconds", None) in (604800,) assert getattr(strategy, "lifetime_seconds", None) in (604800,)