mirror of
https://github.com/dat515-2025/Group-8.git
synced 2026-03-22 06:57:47 +01:00
117 lines
4.4 KiB
Python
117 lines
4.4 KiB
Python
import os
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from fastapi import Depends, Request
|
|
from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin, models
|
|
from fastapi_users.authentication import (
|
|
AuthenticationBackend,
|
|
BearerTransport,
|
|
)
|
|
from fastapi_users.authentication.strategy.jwt import JWTStrategy
|
|
from fastapi_users.db import SQLAlchemyUserDatabase
|
|
from httpx_oauth.oauth2 import BaseOAuth2
|
|
|
|
from app.models.user import User
|
|
from app.oauth.bank_id import BankID
|
|
from app.workers.celery_tasks import send_email
|
|
from app.oauth.custom_openid import CustomOpenID
|
|
from app.oauth.moje_id import MojeIDOAuth
|
|
from app.services.db import get_user_db
|
|
|
|
SECRET = os.getenv("SECRET", "CHANGE_ME_SECRET")
|
|
|
|
FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:5173")
|
|
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000")
|
|
|
|
providers = {
|
|
"MojeID": MojeIDOAuth(
|
|
os.getenv("MOJEID_CLIENT_ID", "CHANGE_ME_CLIENT_ID"),
|
|
os.getenv("MOJEID_CLIENT_SECRET", "CHANGE_ME_CLIENT_SECRET"),
|
|
),
|
|
"BankID": BankID(
|
|
os.getenv("BANKID_CLIENT_ID", "CHANGE_ME_CLIENT_ID"),
|
|
os.getenv("BANKID_CLIENT_SECRET", "CHANGE_ME_CLIENT_SECRET"),
|
|
),
|
|
}
|
|
|
|
|
|
def get_oauth_provider(name: str) -> Optional[BaseOAuth2]:
|
|
if name not in providers:
|
|
return None
|
|
return providers[name]
|
|
|
|
|
|
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
|
|
reset_password_token_secret = SECRET
|
|
verification_token_secret = SECRET
|
|
|
|
async def oauth_callback(self: "BaseUserManager[models.UOAP, models.ID]", oauth_name: str, access_token: str,
|
|
account_id: str, account_email: str, expires_at: Optional[int] = None,
|
|
refresh_token: Optional[str] = None, request: Optional[Request] = None, *,
|
|
associate_by_email: bool = False, is_verified_by_default: bool = False) -> models.UOAP:
|
|
|
|
user = await super().oauth_callback(oauth_name, access_token, account_id, account_email, expires_at,
|
|
refresh_token, request, associate_by_email=associate_by_email,
|
|
is_verified_by_default=is_verified_by_default)
|
|
|
|
# set additional user info from the OAuth provider
|
|
provider = get_oauth_provider(oauth_name)
|
|
if provider is not None and isinstance(provider, CustomOpenID):
|
|
update_dict = await provider.get_user_info(access_token)
|
|
await self.user_db.update(user, update_dict)
|
|
|
|
return user
|
|
|
|
async def on_after_register(self, user: User, request: Optional[Request] = None):
|
|
await self.request_verify(user, request)
|
|
|
|
async def on_after_forgot_password(
|
|
self, user: User, token: str, request: Optional[Request] = None
|
|
):
|
|
print(f"User {user.id} has forgot their password. Reset token: {token}")
|
|
|
|
async def on_after_request_verify(
|
|
self, user: User, token: str, request: Optional[Request] = None
|
|
):
|
|
verify_frontend_link = f"{FRONTEND_URL}/verify?token={token}"
|
|
verify_backend_link = f"{BACKEND_URL}/auth/verify?token={token}"
|
|
subject = "Ověření účtu"
|
|
body = (
|
|
"Ahoj,\n\n"
|
|
"děkujeme za registraci. Prosíme, ověř svůj účet kliknutím na tento odkaz:\n"
|
|
f"{verify_frontend_link}\n\n"
|
|
"Pokud by odkaz nefungoval, můžeš použít i přímý odkaz na backend:\n"
|
|
f"{verify_backend_link}\n\n"
|
|
"Pokud jsi registraci neprováděl(a), tento email ignoruj.\n"
|
|
)
|
|
try:
|
|
send_email.delay(user.email, subject, body)
|
|
except Exception as e:
|
|
print("[Email Fallback] To:", user.email)
|
|
print("[Email Fallback] Subject:", subject)
|
|
print("[Email Fallback] Body:\n", body)
|
|
|
|
|
|
async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)):
|
|
yield UserManager(user_db)
|
|
|
|
|
|
bearer_transport = BearerTransport(tokenUrl="auth/jwt/login")
|
|
|
|
|
|
def get_jwt_strategy() -> JWTStrategy:
|
|
return JWTStrategy(secret=SECRET, lifetime_seconds=604800)
|
|
|
|
|
|
auth_backend = AuthenticationBackend(
|
|
name="jwt",
|
|
transport=bearer_transport,
|
|
get_strategy=get_jwt_strategy,
|
|
)
|
|
|
|
fastapi_users = FastAPIUsers[User, uuid.UUID](get_user_manager, [auth_backend])
|
|
|
|
current_active_user = fastapi_users.current_user(active=True)
|
|
current_active_verified_user = fastapi_users.current_user(active=True, verified=True)
|