Files
uis-cloud-computing/7project/backend/app/services/bank_scraper.py

122 lines
4.4 KiB
Python

import json
import logging
from os.path import dirname, join
from time import strptime
from uuid import UUID
import httpx
from sqlalchemy import select
from app.core.db import async_session_maker
from app.models.transaction import Transaction
from app.models.user import User
logger = logging.getLogger(__name__)
OAUTH_DIR = join(dirname(__file__), "..", "oauth")
CERTS = (
join(OAUTH_DIR, "public_key.pem"),
join(OAUTH_DIR, "private_key.key"),
)
async def aload_ceska_sporitelna_transactions(user_id: str) -> None:
try:
uid = UUID(str(user_id))
except Exception:
logger.error("Invalid user_id provided to bank_scraper (async): %r", user_id)
return
await _aload_ceska_sporitelna_transactions(uid)
async def aload_all_ceska_sporitelna_transactions() -> None:
async with async_session_maker() as session:
result = await session.execute(select(User))
users = result.unique().scalars().all()
logger.info("[BankScraper] Starting CSAS scrape for all users | count=%d", len(users))
processed = 0
for user in users:
try:
await _aload_ceska_sporitelna_transactions(user.id)
processed += 1
except Exception:
logger.exception("[BankScraper] Error scraping for user id=%s email=%s", user.id,
getattr(user, 'email', None))
logger.info("[BankScraper] Finished CSAS scrape for all users | processed=%d", processed)
async def _aload_ceska_sporitelna_transactions(user_id: UUID) -> None:
async with (async_session_maker() as session):
result = await session.execute(select(User).where(User.id == user_id))
user: User = result.unique().scalar_one_or_none()
if user is None:
logger.warning("User not found for id=%s", user_id)
return
cfg = user.config or {}
if "csas" not in cfg:
return
cfg = json.loads(cfg["csas"])
if "access_token" not in cfg:
return
accounts = []
try:
async with httpx.AsyncClient(cert=CERTS, timeout=httpx.Timeout(20.0)) as client:
response = await client.get(
"https://webapi.developers.erstegroup.com/api/csas/sandbox/v4/account-information/my/accounts?size=10&page=0&sort=iban&order=desc",
headers={
"Authorization": f"Bearer {cfg['access_token']}",
"WEB-API-key": "09fdc637-3c57-4242-95f2-c2205a2438f3",
"user-involved": "false",
},
)
if response.status_code != httpx.codes.OK:
return
for account in response.json()["accounts"]:
accounts.append(account)
except (httpx.HTTPError,) as e:
logger.exception("[BankScraper] HTTP error during CSAS request | user_id=%s", user_id)
return
for account in accounts:
id = account["id"]
url = f"https://webapi.developers.erstegroup.com/api/csas/sandbox/v4/account-information/my/accounts/{id}/transactions?size=100&page=0&sort=bookingdate&order=desc"
async with httpx.AsyncClient(cert=CERTS) as client:
response = await client.get(
url,
headers={
"Authorization": f"Bearer {cfg['access_token']}",
"WEB-API-key": "09fdc637-3c57-4242-95f2-c2205a2438f3",
"user-involved": "false",
},
)
if response.status_code != httpx.codes.OK:
continue
transactions = response.json()["transactions"]
for transaction in transactions:
description = transaction.get("entryDetails", {}).get("transactionDetails", {}).get(
"additionalRemittanceInformation")
date_str = transaction.get("bookingDate", {}).get("date")
date = strptime(date_str, "%Y-%m-%d") if date_str else None
obj = Transaction(
amount=transaction['amount']['value'],
description=description,
date=date,
user_id=user_id,
)
session.add(obj)
await session.commit()
pass
pass