mirror of
https://github.com/dat515-2025/Group-8.git
synced 2026-03-22 15:12:08 +01:00
179 lines
6.4 KiB
Python
179 lines
6.4 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
from os.path import dirname, join
|
|
from time import strptime
|
|
from uuid import UUID
|
|
|
|
import httpx
|
|
from sqlalchemy import select
|
|
|
|
from app.core.db import sync_session_maker
|
|
from app.models.transaction import Transaction
|
|
from app.models.user import User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
OAUTH_DIR = join(dirname(__file__), "..", "oauth")
|
|
CERTS = (
|
|
join(OAUTH_DIR, "public_key.pem"),
|
|
join(OAUTH_DIR, "private_key.key"),
|
|
)
|
|
|
|
|
|
def load_mock_bank_transactions(user_id: str) -> None:
|
|
try:
|
|
uid = UUID(str(user_id))
|
|
except Exception:
|
|
logger.error("Invalid user_id provided to bank_scraper (sync): %r", user_id)
|
|
return
|
|
|
|
_load_mock_bank_transactions(uid)
|
|
|
|
|
|
def load_all_mock_bank_transactions() -> None:
|
|
with sync_session_maker() as session:
|
|
users = session.execute(select(User)).unique().scalars().all()
|
|
logger.info("[BankScraper] Starting Mock Bank scrape for all users | count=%d", len(users))
|
|
|
|
processed = 0
|
|
for user in users:
|
|
try:
|
|
_load_mock_bank_transactions(user.id)
|
|
processed += 1
|
|
except Exception:
|
|
logger.exception("[BankScraper] Error scraping for user id=%s email=%s", user.id,
|
|
getattr(user, 'email', None))
|
|
logger.info("[BankScraper] Finished Mock Bank scrape for all users | processed=%d", processed)
|
|
|
|
|
|
def _load_mock_bank_transactions(user_id: UUID) -> None:
|
|
with sync_session_maker() as session:
|
|
user: User | None = session.execute(select(User).where(User.id == user_id)).unique().scalar_one_or_none()
|
|
if user is None:
|
|
logger.warning("User not found for id=%s", user_id)
|
|
return
|
|
|
|
transactions = []
|
|
with httpx.Client() as client:
|
|
response = client.get(f"{os.getenv('APP_POD_URL')}/mock-bank/scrape")
|
|
if response.status_code != httpx.codes.OK:
|
|
return
|
|
for transaction in response.json():
|
|
transactions.append(
|
|
Transaction(
|
|
amount=transaction["amount"],
|
|
description=transaction.get("description"),
|
|
date=strptime(transaction["date"], "%Y-%m-%d"),
|
|
user_id=user_id,
|
|
)
|
|
)
|
|
|
|
for transaction in transactions:
|
|
session.add(transaction)
|
|
session.commit()
|
|
|
|
|
|
def load_ceska_sporitelna_transactions(user_id: str) -> None:
|
|
try:
|
|
uid = UUID(str(user_id))
|
|
except Exception:
|
|
logger.error("Invalid user_id provided to bank_scraper (sync): %r", user_id)
|
|
return
|
|
|
|
_load_ceska_sporitelna_transactions(uid)
|
|
|
|
|
|
def load_all_ceska_sporitelna_transactions() -> None:
|
|
with sync_session_maker() as session:
|
|
users = session.execute(select(User)).unique().scalars().all()
|
|
logger.info("[BankScraper] Starting CSAS scrape for all users | count=%d", len(users))
|
|
|
|
processed = 0
|
|
for user in users:
|
|
try:
|
|
_load_ceska_sporitelna_transactions(user.id)
|
|
processed += 1
|
|
except Exception:
|
|
logger.exception("[BankScraper] Error scraping for user id=%s email=%s", user.id,
|
|
getattr(user, 'email', None))
|
|
logger.info("[BankScraper] Finished CSAS scrape for all users | processed=%d", processed)
|
|
|
|
|
|
def _load_ceska_sporitelna_transactions(user_id: UUID) -> None:
|
|
with sync_session_maker() as session:
|
|
user: User | None = session.execute(select(User).where(User.id == user_id)).unique().scalar_one_or_none()
|
|
if user is None:
|
|
logger.warning("User not found for id=%s", user_id)
|
|
return
|
|
|
|
cfg = user.config or {}
|
|
if "csas" not in cfg:
|
|
return
|
|
|
|
cfg = json.loads(cfg["csas"])
|
|
if "access_token" not in cfg:
|
|
return
|
|
|
|
accounts = []
|
|
try:
|
|
with httpx.Client(cert=CERTS, timeout=httpx.Timeout(20.0)) as client:
|
|
response = client.get(
|
|
"https://webapi.developers.erstegroup.com/api/csas/sandbox/v4/account-information/my/accounts?size=10&page=0&sort=iban&order=desc",
|
|
headers={
|
|
"Authorization": f"Bearer {cfg['access_token']}",
|
|
"WEB-API-key": "09fdc637-3c57-4242-95f2-c2205a2438f3",
|
|
"user-involved": "false",
|
|
},
|
|
)
|
|
if response.status_code != httpx.codes.OK:
|
|
return
|
|
|
|
for account in response.json().get("accounts", []):
|
|
accounts.append(account)
|
|
|
|
except (httpx.HTTPError,) as e:
|
|
logger.exception("[BankScraper] HTTP error during CSAS request | user_id=%s", user_id)
|
|
return
|
|
|
|
for account in accounts:
|
|
acc_id = account.get("id")
|
|
if not acc_id:
|
|
continue
|
|
|
|
url = f"https://webapi.developers.erstegroup.com/api/csas/sandbox/v4/account-information/my/accounts/{acc_id}/transactions?size=100&page=0&sort=bookingdate&order=desc"
|
|
with httpx.Client(cert=CERTS) as client:
|
|
response = client.get(
|
|
url,
|
|
headers={
|
|
"Authorization": f"Bearer {cfg['access_token']}",
|
|
"WEB-API-key": "09fdc637-3c57-4242-95f2-c2205a2438f3",
|
|
"user-involved": "false",
|
|
},
|
|
)
|
|
if response.status_code != httpx.codes.OK:
|
|
continue
|
|
|
|
transactions = response.json().get("transactions", [])
|
|
|
|
for transaction in transactions:
|
|
description = transaction.get("entryDetails", {}).get("transactionDetails", {}).get(
|
|
"additionalRemittanceInformation")
|
|
date_str = transaction.get("bookingDate", {}).get("date")
|
|
date = strptime(date_str, "%Y-%m-%d") if date_str else None
|
|
amount = transaction.get("amount", {}).get("value")
|
|
if transaction.get("creditDebitIndicator") == "DBIT" and amount is not None:
|
|
amount = -abs(amount)
|
|
|
|
if amount is None:
|
|
continue
|
|
|
|
obj = Transaction(
|
|
amount=amount,
|
|
description=description,
|
|
date=date,
|
|
user_id=user_id,
|
|
)
|
|
session.add(obj)
|
|
session.commit()
|