mirror of
https://github.com/dat515-2025/Group-8.git
synced 2026-03-22 06:57:47 +01:00
45 lines
1.2 KiB
Python
45 lines
1.2 KiB
Python
from typing import Optional
|
|
import re
|
|
import jwt
|
|
from fastapi import Request
|
|
|
|
# Simple in-memory revocation store. In production, consider Redis or database.
|
|
_REVOKED_TOKENS: set[str] = set()
|
|
|
|
# Bearer token regex
|
|
_BEARER_RE = re.compile(r"^[Bb]earer\s+(.+)$")
|
|
|
|
|
|
def extract_bearer_token(request: Request) -> Optional[str]:
|
|
auth = request.headers.get("authorization")
|
|
if not auth:
|
|
return None
|
|
m = _BEARER_RE.match(auth)
|
|
if not m:
|
|
return None
|
|
return m.group(1).strip()
|
|
|
|
|
|
def revoke_token(token: str) -> None:
|
|
if token:
|
|
_REVOKED_TOKENS.add(token)
|
|
|
|
|
|
def is_token_revoked(token: str) -> bool:
|
|
return token in _REVOKED_TOKENS
|
|
|
|
|
|
def decode_and_verify_jwt(token: str, secret: str) -> dict:
|
|
"""
|
|
Decode the JWT using the shared secret, verifying expiration and signature.
|
|
Audience is not verified here to be compatible with fastapi-users default tokens.
|
|
Raises jwt.ExpiredSignatureError if expired.
|
|
Raises jwt.InvalidTokenError for other issues.
|
|
Returns the decoded payload dict on success.
|
|
"""
|
|
return jwt.decode(
|
|
token,
|
|
secret,
|
|
algorithms=["HS256"],
|
|
options={"verify_aud": False},
|
|
) # verify_exp is True by default |