Compare commits

...

6 Commits

7 changed files with 123 additions and 11 deletions

View File

@@ -0,0 +1,38 @@
"""change token length
Revision ID: 5ab2e654c96e
Revises: 7af8f296d089
Create Date: 2025-10-11 21:07:41.930470
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision: str = '5ab2e654c96e'
down_revision: Union[str, Sequence[str], None] = '7af8f296d089'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('oauth_account', 'access_token',
existing_type=mysql.VARCHAR(length=1024),
type_=sa.String(length=4096),
existing_nullable=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('oauth_account', 'access_token',
existing_type=sa.String(length=4096),
type_=mysql.VARCHAR(length=1024),
existing_nullable=False)
# ### end Alembic commands ###

View File

@@ -1,11 +1,10 @@
from fastapi import Depends, FastAPI from fastapi import Depends, FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
import app.services.user_service
from app.models.user import User from app.models.user import User
from app.schemas.user import UserCreate, UserRead, UserUpdate from app.schemas.user import UserCreate, UserRead, UserUpdate
from app.services.user_service import auth_backend, current_active_verified_user, fastapi_users from app.services.user_service import auth_backend, current_active_verified_user, fastapi_users, get_oauth_provider
fastApi = FastAPI() fastApi = FastAPI()
@@ -47,7 +46,7 @@ fastApi.include_router(
fastApi.include_router( fastApi.include_router(
fastapi_users.get_oauth_router( fastapi_users.get_oauth_router(
app.services.user_service.get_oauth_provider("MojeID"), get_oauth_provider("MojeID"),
auth_backend, auth_backend,
"SECRET", "SECRET",
associate_by_email=True, associate_by_email=True,
@@ -56,6 +55,17 @@ fastApi.include_router(
tags=["auth"], tags=["auth"],
) )
fastApi.include_router(
fastapi_users.get_oauth_router(
get_oauth_provider("BankID"),
auth_backend,
"SECRET",
associate_by_email=True,
),
prefix="/auth/bankid",
tags=["auth"],
)
# Liveness/root endpoint # Liveness/root endpoint
@fastApi.get("/", include_in_schema=False) @fastApi.get("/", include_in_schema=False)

View File

@@ -1,13 +1,12 @@
from typing import List
from sqlalchemy import Column, String from sqlalchemy import Column, String
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship, mapped_column, Mapped
from fastapi_users.db import SQLAlchemyBaseUserTableUUID, SQLAlchemyBaseOAuthAccountTableUUID from fastapi_users.db import SQLAlchemyBaseUserTableUUID, SQLAlchemyBaseOAuthAccountTableUUID
from app.core.base import Base from app.core.base import Base
class OAuthAccount(SQLAlchemyBaseOAuthAccountTableUUID, Base): class OAuthAccount(SQLAlchemyBaseOAuthAccountTableUUID, Base):
pass # BankID token is longer than default
access_token: Mapped[str] = mapped_column(String(length=4096), nullable=False)
class User(SQLAlchemyBaseUserTableUUID, Base): class User(SQLAlchemyBaseUserTableUUID, Base):

View File

@@ -0,0 +1,50 @@
import secrets
from typing import Optional, Literal
from httpx_oauth.oauth2 import T
from app.oauth.custom_openid import CustomOpenID
class BankID(CustomOpenID):
def __init__(self, client_id: str, client_secret: str):
super().__init__(
client_id,
client_secret,
"https://oidc.sandbox.bankid.cz/.well-known/openid-configuration",
"BankID",
base_scopes=["openid", "profile.email", "profile.name"],
)
async def get_user_info(self, token: str) -> dict:
info = await self.get_profile(token)
return {
"first_name": info.get("given_name"),
"last_name": info.get("family_name"),
}
async def get_authorization_url(
self,
redirect_uri: str,
state: Optional[str] = None,
scope: Optional[list[str]] = None,
code_challenge: Optional[str] = None,
code_challenge_method: Optional[Literal["plain", "S256"]] = None,
extras_params: Optional[T] = None,
) -> str:
if extras_params is None:
extras_params = {}
# BankID requires random nonce parameter for security
# https://developer.bankid.cz/docs/security_sep
extras_params["nonce"] = secrets.token_urlsafe()
return await super().get_authorization_url(
redirect_uri,
state,
scope,
code_challenge,
code_challenge_method,
extras_params,
)

View File

@@ -0,0 +1,6 @@
from httpx_oauth.clients.openid import OpenID
class CustomOpenID(OpenID):
async def get_user_info(self, token: str) -> dict:
raise NotImplementedError()

View File

@@ -1,11 +1,12 @@
import json import json
from typing import Optional, Literal, Any from typing import Optional, Literal, Any
from httpx_oauth.clients.openid import OpenID
from httpx_oauth.oauth2 import T from httpx_oauth.oauth2 import T
from app.oauth.custom_openid import CustomOpenID
class MojeIDOAuth(OpenID):
class MojeIDOAuth(CustomOpenID):
def __init__(self, client_id: str, client_secret: str): def __init__(self, client_id: str, client_secret: str):
super().__init__( super().__init__(
client_id, client_id,

View File

@@ -10,13 +10,17 @@ from fastapi_users.authentication import (
) )
from fastapi_users.authentication.strategy.jwt import JWTStrategy from fastapi_users.authentication.strategy.jwt import JWTStrategy
from fastapi_users.db import SQLAlchemyUserDatabase from fastapi_users.db import SQLAlchemyUserDatabase
from httpx_oauth.oauth2 import BaseOAuth2
from app.models.user import User from app.models.user import User
from app.oauth.bank_id import BankID
from app.oauth.custom_openid import CustomOpenID
from app.oauth.moje_id import MojeIDOAuth from app.oauth.moje_id import MojeIDOAuth
from app.services.db import get_user_db from app.services.db import get_user_db
from app.core.queue import enqueue_email from app.core.queue import enqueue_email
SECRET = os.getenv("SECRET", "CHANGE_ME_SECRET") SECRET = os.getenv("SECRET", "CHANGE_ME_SECRET")
FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:5173") FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:5173")
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000") BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000")
@@ -24,11 +28,15 @@ providers = {
"MojeID": MojeIDOAuth( "MojeID": MojeIDOAuth(
os.getenv("MOJEID_CLIENT_ID", "CHANGE_ME_CLIENT_ID"), os.getenv("MOJEID_CLIENT_ID", "CHANGE_ME_CLIENT_ID"),
os.getenv("MOJEID_CLIENT_SECRET", "CHANGE_ME_CLIENT_SECRET"), os.getenv("MOJEID_CLIENT_SECRET", "CHANGE_ME_CLIENT_SECRET"),
),
"BankID": BankID(
os.getenv("BANKID_CLIENT_ID", "CHANGE_ME_CLIENT_ID"),
os.getenv("BANKID_CLIENT_SECRET", "CHANGE_ME_CLIENT_SECRET"),
) )
} }
def get_oauth_provider(name: str) -> Optional[MojeIDOAuth]: def get_oauth_provider(name: str) -> Optional[BaseOAuth2]:
if name not in providers: if name not in providers:
return None return None
return providers[name] return providers[name]
@@ -49,7 +57,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
# set additional user info from the OAuth provider # set additional user info from the OAuth provider
provider = get_oauth_provider(oauth_name) provider = get_oauth_provider(oauth_name)
if provider is not None and hasattr(provider, "get_user_info"): if provider is not None and isinstance(provider, CustomOpenID):
update_dict = await provider.get_user_info(access_token) update_dict = await provider.get_user_info(access_token)
await self.user_db.update(user, update_dict) await self.user_db.update(user, update_dict)