import json import logging import os from os.path import dirname, join from time import strptime from uuid import UUID import httpx from sqlalchemy import select from app.core.db import sync_session_maker from app.models.transaction import Transaction from app.models.user import User logger = logging.getLogger(__name__) OAUTH_DIR = join(dirname(__file__), "..", "oauth") CERTS = ( join(OAUTH_DIR, "public_key.pem"), join(OAUTH_DIR, "private_key.key"), ) def load_mock_bank_transactions(user_id: str) -> None: try: uid = UUID(str(user_id)) except Exception: logger.error("Invalid user_id provided to bank_scraper (sync): %r", user_id) return _load_mock_bank_transactions(uid) def load_all_mock_bank_transactions() -> None: with sync_session_maker() as session: users = session.execute(select(User)).unique().scalars().all() logger.info("[BankScraper] Starting Mock Bank scrape for all users | count=%d", len(users)) processed = 0 for user in users: try: _load_mock_bank_transactions(user.id) processed += 1 except Exception: logger.exception("[BankScraper] Error scraping for user id=%s email=%s", user.id, getattr(user, 'email', None)) logger.info("[BankScraper] Finished Mock Bank scrape for all users | processed=%d", processed) def _load_mock_bank_transactions(user_id: UUID) -> None: with sync_session_maker() as session: user: User | None = session.execute(select(User).where(User.id == user_id)).unique().scalar_one_or_none() if user is None: logger.warning("User not found for id=%s", user_id) return transactions = [] with httpx.Client() as client: response = client.get(f"{os.getenv("APP_POD_URL")}/mock-bank/scrape") if response.status_code != httpx.codes.OK: return for transaction in response.json(): transactions.append( Transaction( amount=transaction["amount"], description=transaction.get("description"), date=strptime(transaction["date"], "%Y-%m-%d"), user_id=user_id, ) ) for transaction in transactions: session.add(transaction) session.commit() def load_ceska_sporitelna_transactions(user_id: str) -> None: try: uid = UUID(str(user_id)) except Exception: logger.error("Invalid user_id provided to bank_scraper (sync): %r", user_id) return _load_ceska_sporitelna_transactions(uid) def load_all_ceska_sporitelna_transactions() -> None: with sync_session_maker() as session: users = session.execute(select(User)).unique().scalars().all() logger.info("[BankScraper] Starting CSAS scrape for all users | count=%d", len(users)) processed = 0 for user in users: try: _load_ceska_sporitelna_transactions(user.id) processed += 1 except Exception: logger.exception("[BankScraper] Error scraping for user id=%s email=%s", user.id, getattr(user, 'email', None)) logger.info("[BankScraper] Finished CSAS scrape for all users | processed=%d", processed) def _load_ceska_sporitelna_transactions(user_id: UUID) -> None: with sync_session_maker() as session: user: User | None = session.execute(select(User).where(User.id == user_id)).unique().scalar_one_or_none() if user is None: logger.warning("User not found for id=%s", user_id) return cfg = user.config or {} if "csas" not in cfg: return cfg = json.loads(cfg["csas"]) if "access_token" not in cfg: return accounts = [] try: with httpx.Client(cert=CERTS, timeout=httpx.Timeout(20.0)) as client: response = client.get( "https://webapi.developers.erstegroup.com/api/csas/sandbox/v4/account-information/my/accounts?size=10&page=0&sort=iban&order=desc", headers={ "Authorization": f"Bearer {cfg['access_token']}", "WEB-API-key": "09fdc637-3c57-4242-95f2-c2205a2438f3", "user-involved": "false", }, ) if response.status_code != httpx.codes.OK: return for account in response.json().get("accounts", []): accounts.append(account) except (httpx.HTTPError,) as e: logger.exception("[BankScraper] HTTP error during CSAS request | user_id=%s", user_id) return for account in accounts: acc_id = account.get("id") if not acc_id: continue url = f"https://webapi.developers.erstegroup.com/api/csas/sandbox/v4/account-information/my/accounts/{acc_id}/transactions?size=100&page=0&sort=bookingdate&order=desc" with httpx.Client(cert=CERTS) as client: response = client.get( url, headers={ "Authorization": f"Bearer {cfg['access_token']}", "WEB-API-key": "09fdc637-3c57-4242-95f2-c2205a2438f3", "user-involved": "false", }, ) if response.status_code != httpx.codes.OK: continue transactions = response.json().get("transactions", []) for transaction in transactions: description = transaction.get("entryDetails", {}).get("transactionDetails", {}).get( "additionalRemittanceInformation") date_str = transaction.get("bookingDate", {}).get("date") date = strptime(date_str, "%Y-%m-%d") if date_str else None amount = transaction.get("amount", {}).get("value") if transaction.get("creditDebitIndicator") == "DBIT" and amount is not None: amount = -abs(amount) if amount is None: continue obj = Transaction( amount=amount, description=description, date=date, user_id=user_id, ) session.add(obj) session.commit()