import os import uuid from typing import Optional from fastapi import Depends, Request from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin, models from fastapi_users.authentication import ( AuthenticationBackend, BearerTransport, ) from fastapi_users.authentication.strategy.jwt import JWTStrategy from fastapi_users.db import SQLAlchemyUserDatabase from httpx_oauth.oauth2 import BaseOAuth2 from app.models.user import User from app.oauth.bank_id import BankID from app.oauth.csas import CSASOAuth from app.oauth.custom_openid import CustomOpenID from app.oauth.moje_id import MojeIDOAuth from app.services.db import get_user_db from app.core.queue import enqueue_email SECRET = os.getenv("SECRET", "CHANGE_ME_SECRET") FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:5173") BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000") providers = { "MojeID": MojeIDOAuth( os.getenv("MOJEID_CLIENT_ID", "CHANGE_ME_CLIENT_ID"), os.getenv("MOJEID_CLIENT_SECRET", "CHANGE_ME_CLIENT_SECRET"), ), "BankID": BankID( os.getenv("BANKID_CLIENT_ID", "CHANGE_ME_CLIENT_ID"), os.getenv("BANKID_CLIENT_SECRET", "CHANGE_ME_CLIENT_SECRET"), ), } def get_oauth_provider(name: str) -> Optional[BaseOAuth2]: if name not in providers: return None return providers[name] class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]): reset_password_token_secret = SECRET verification_token_secret = SECRET async def oauth_callback(self: "BaseUserManager[models.UOAP, models.ID]", oauth_name: str, access_token: str, account_id: str, account_email: str, expires_at: Optional[int] = None, refresh_token: Optional[str] = None, request: Optional[Request] = None, *, associate_by_email: bool = False, is_verified_by_default: bool = False) -> models.UOAP: user = await super().oauth_callback(oauth_name, access_token, account_id, account_email, expires_at, refresh_token, request, associate_by_email=associate_by_email, is_verified_by_default=is_verified_by_default) # set additional user info from the OAuth provider provider = get_oauth_provider(oauth_name) if provider is not None and isinstance(provider, CustomOpenID): update_dict = await provider.get_user_info(access_token) await self.user_db.update(user, update_dict) return user async def on_after_register(self, user: User, request: Optional[Request] = None): await self.request_verify(user, request) async def on_after_forgot_password( self, user: User, token: str, request: Optional[Request] = None ): print(f"User {user.id} has forgot their password. Reset token: {token}") async def on_after_request_verify( self, user: User, token: str, request: Optional[Request] = None ): verify_frontend_link = f"{FRONTEND_URL}/verify?token={token}" verify_backend_link = f"{BACKEND_URL}/auth/verify?token={token}" subject = "Ověření účtu" body = ( "Ahoj,\n\n" "děkujeme za registraci. Prosíme, ověř svůj účet kliknutím na tento odkaz:\n" f"{verify_frontend_link}\n\n" "Pokud by odkaz nefungoval, můžeš použít i přímý odkaz na backend:\n" f"{verify_backend_link}\n\n" "Pokud jsi registraci neprováděl(a), tento email ignoruj.\n" ) try: enqueue_email(to=user.email, subject=subject, body=body) except Exception as e: print("[Email Fallback] To:", user.email) print("[Email Fallback] Subject:", subject) print("[Email Fallback] Body:\n", body) async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)): yield UserManager(user_db) bearer_transport = BearerTransport(tokenUrl="auth/jwt/login") def get_jwt_strategy() -> JWTStrategy: return JWTStrategy(secret=SECRET, lifetime_seconds=3600) auth_backend = AuthenticationBackend( name="jwt", transport=bearer_transport, get_strategy=get_jwt_strategy, ) fastapi_users = FastAPIUsers[User, uuid.UUID](get_user_manager, [auth_backend]) current_active_user = fastapi_users.current_user(active=True) current_active_verified_user = fastapi_users.current_user(active=True, verified=True)