from typing import Optional import re import jwt from fastapi import Request # Simple in-memory revocation store for revoked JWT tokens. # # Limitations: # - All revoked tokens will be lost if the process restarts (data loss on restart). # - Not suitable for multi-instance deployments: the revocation list is not shared between instances. # A token revoked in one instance will not be recognized as revoked in others. # # For production, use a persistent and shared store (e.g., Redis or a database). _REVOKED_TOKENS: set[str] = set() # Bearer token regex _BEARER_RE = re.compile(r"^[Bb]earer\s+(.+)$") def extract_bearer_token(request: Request) -> Optional[str]: auth = request.headers.get("authorization") if not auth: return None m = _BEARER_RE.match(auth) if not m: return None return m.group(1).strip() def revoke_token(token: str) -> None: if token: _REVOKED_TOKENS.add(token) def is_token_revoked(token: str) -> bool: return token in _REVOKED_TOKENS def decode_and_verify_jwt(token: str, secret: str) -> dict: """ Decode the JWT using the shared secret, verifying expiration and signature. Audience is not verified here to be compatible with fastapi-users default tokens. Raises jwt.ExpiredSignatureError if expired. Raises jwt.InvalidTokenError for other issues. Returns the decoded payload dict on success. """ return jwt.decode( token, secret, algorithms=["HS256"], options={"verify_aud": False}, ) # verify_exp is True by default