Merge branch 'main' into 20-create-a-controller-layer-on-backend-side

This commit is contained in:
Dejan Ribarovski
2025-10-13 14:03:24 +02:00
committed by GitHub
20 changed files with 437 additions and 18 deletions

View File

@@ -7,11 +7,12 @@ from app.services.user_service import current_active_verified_user
from app.api.auth import router as auth_router
from app.api.categories import router as categories_router
from app.api.transactions import router as transactions_router
from app.services.user_service import auth_backend, current_active_verified_user, fastapi_users, get_oauth_provider
app = FastAPI()
fastApi = FastAPI()
# CORS for frontend dev server
app.add_middleware(
fastApi.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:5173",
@@ -26,13 +27,35 @@ app.include_router(auth_router)
app.include_router(categories_router)
app.include_router(transactions_router)
fastApi.include_router(
fastapi_users.get_oauth_router(
get_oauth_provider("MojeID"),
auth_backend,
"SECRET",
associate_by_email=True,
),
prefix="/auth/mojeid",
tags=["auth"],
)
fastApi.include_router(
fastapi_users.get_oauth_router(
get_oauth_provider("BankID"),
auth_backend,
"SECRET",
associate_by_email=True,
),
prefix="/auth/bankid",
tags=["auth"],
)
# Liveness/root endpoint
@app.get("/", include_in_schema=False)
@fastApi.get("/", include_in_schema=False)
async def root():
return {"status": "ok"}
@app.get("/authenticated-route")
@fastApi.get("/authenticated-route")
async def authenticated_route(user: User = Depends(current_active_verified_user)):
return {"message": f"Hello {user.email}!"}

View File

@@ -1,13 +1,19 @@
from sqlalchemy import Column, String
from sqlalchemy.orm import relationship
from fastapi_users.db import SQLAlchemyBaseUserTableUUID
from sqlalchemy.orm import relationship, mapped_column, Mapped
from fastapi_users.db import SQLAlchemyBaseUserTableUUID, SQLAlchemyBaseOAuthAccountTableUUID
from app.core.base import Base
class OAuthAccount(SQLAlchemyBaseOAuthAccountTableUUID, Base):
# BankID token is longer than default
access_token: Mapped[str] = mapped_column(String(length=4096), nullable=False)
class User(SQLAlchemyBaseUserTableUUID, Base):
first_name = Column(String(length=100), nullable=True)
last_name = Column(String(length=100), nullable=True)
oauth_accounts = relationship("OAuthAccount", lazy="joined")
# Relationship
transactions = relationship("Transaction", back_populates="user")
categories = relationship("Category", back_populates="user")
categories = relationship("Category", back_populates="user")

View File

View File

@@ -0,0 +1,50 @@
import secrets
from typing import Optional, Literal
from httpx_oauth.oauth2 import T
from app.oauth.custom_openid import CustomOpenID
class BankID(CustomOpenID):
def __init__(self, client_id: str, client_secret: str):
super().__init__(
client_id,
client_secret,
"https://oidc.sandbox.bankid.cz/.well-known/openid-configuration",
"BankID",
base_scopes=["openid", "profile.email", "profile.name"],
)
async def get_user_info(self, token: str) -> dict:
info = await self.get_profile(token)
return {
"first_name": info.get("given_name"),
"last_name": info.get("family_name"),
}
async def get_authorization_url(
self,
redirect_uri: str,
state: Optional[str] = None,
scope: Optional[list[str]] = None,
code_challenge: Optional[str] = None,
code_challenge_method: Optional[Literal["plain", "S256"]] = None,
extras_params: Optional[T] = None,
) -> str:
if extras_params is None:
extras_params = {}
# BankID requires random nonce parameter for security
# https://developer.bankid.cz/docs/security_sep
extras_params["nonce"] = secrets.token_urlsafe()
return await super().get_authorization_url(
redirect_uri,
state,
scope,
code_challenge,
code_challenge_method,
extras_params,
)

View File

@@ -0,0 +1,6 @@
from httpx_oauth.clients.openid import OpenID
class CustomOpenID(OpenID):
async def get_user_info(self, token: str) -> dict:
raise NotImplementedError()

View File

@@ -0,0 +1,56 @@
import json
from typing import Optional, Literal, Any
from httpx_oauth.oauth2 import T
from app.oauth.custom_openid import CustomOpenID
class MojeIDOAuth(CustomOpenID):
def __init__(self, client_id: str, client_secret: str):
super().__init__(
client_id,
client_secret,
"https://mojeid.regtest.nic.cz/.well-known/openid-configuration/",
"MojeID",
base_scopes=["openid", "email", "profile"],
)
async def get_user_info(self, token: str) -> Optional[Any]:
info = await self.get_profile(token)
return {
"first_name": info.get("given_name"),
"last_name": info.get("family_name"),
}
async def get_authorization_url(
self,
redirect_uri: str,
state: Optional[str] = None,
scope: Optional[list[str]] = None,
code_challenge: Optional[str] = None,
code_challenge_method: Optional[Literal["plain", "S256"]] = None,
extras_params: Optional[T] = None,
) -> str:
required_fields = {
'id_token': {
'name': {'essential': True},
'given_name': {'essential': True},
'family_name': {'essential': True},
'email': {'essential': True},
'mojeid_valid': {'essential': True},
}}
if extras_params is None:
extras_params = {}
extras_params["claims"] = json.dumps(required_fields)
return await super().get_authorization_url(
redirect_uri,
state,
scope,
code_challenge,
code_challenge_method,
extras_params,
)

View File

@@ -4,11 +4,13 @@ from sqlalchemy.ext.asyncio import AsyncSession
from fastapi_users.db import SQLAlchemyUserDatabase
from ..core.db import async_session_maker
from ..models.user import User
from ..models.user import User, OAuthAccount
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User)
yield SQLAlchemyUserDatabase(session, User, OAuthAccount)

View File

@@ -3,26 +3,66 @@ import uuid
from typing import Optional
from fastapi import Depends, Request
from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin
from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin, models
from fastapi_users.authentication import (
AuthenticationBackend,
BearerTransport,
)
from fastapi_users.authentication.strategy.jwt import JWTStrategy
from fastapi_users.db import SQLAlchemyUserDatabase
from httpx_oauth.oauth2 import BaseOAuth2
from app.models.user import User
from app.oauth.bank_id import BankID
from app.oauth.custom_openid import CustomOpenID
from app.oauth.moje_id import MojeIDOAuth
from app.services.db import get_user_db
from app.core.queue import enqueue_email
SECRET = os.getenv("SECRET", "CHANGE_ME_SECRET")
FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:5173")
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000")
providers = {
"MojeID": MojeIDOAuth(
os.getenv("MOJEID_CLIENT_ID", "CHANGE_ME_CLIENT_ID"),
os.getenv("MOJEID_CLIENT_SECRET", "CHANGE_ME_CLIENT_SECRET"),
),
"BankID": BankID(
os.getenv("BANKID_CLIENT_ID", "CHANGE_ME_CLIENT_ID"),
os.getenv("BANKID_CLIENT_SECRET", "CHANGE_ME_CLIENT_SECRET"),
)
}
def get_oauth_provider(name: str) -> Optional[BaseOAuth2]:
if name not in providers:
return None
return providers[name]
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
reset_password_token_secret = SECRET
verification_token_secret = SECRET
async def oauth_callback(self: "BaseUserManager[models.UOAP, models.ID]", oauth_name: str, access_token: str,
account_id: str, account_email: str, expires_at: Optional[int] = None,
refresh_token: Optional[str] = None, request: Optional[Request] = None, *,
associate_by_email: bool = False, is_verified_by_default: bool = False) -> models.UOAP:
user = await super().oauth_callback(oauth_name, access_token, account_id, account_email, expires_at,
refresh_token, request, associate_by_email=associate_by_email,
is_verified_by_default=is_verified_by_default)
# set additional user info from the OAuth provider
provider = get_oauth_provider(oauth_name)
if provider is not None and isinstance(provider, CustomOpenID):
update_dict = await provider.get_user_info(access_token)
await self.user_db.update(user, update_dict)
return user
async def on_after_register(self, user: User, request: Optional[Request] = None):
await self.request_verify(user, request)
@@ -52,14 +92,18 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
print("[Email Fallback] Subject:", subject)
print("[Email Fallback] Body:\n", body)
async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)):
yield UserManager(user_db)
bearer_transport = BearerTransport(tokenUrl="auth/jwt/login")
def get_jwt_strategy() -> JWTStrategy:
return JWTStrategy(secret=SECRET, lifetime_seconds=3600)
auth_backend = AuthenticationBackend(
name="jwt",
transport=bearer_transport,
@@ -70,4 +114,3 @@ fastapi_users = FastAPIUsers[User, uuid.UUID](get_user_manager, [auth_backend])
current_active_user = fastapi_users.current_user(active=True)
current_active_verified_user = fastapi_users.current_user(active=True, verified=True)