gwoe-antragspruefer/app/auth.py

225 lines
8.4 KiB
Python
Raw Normal View History

"""Keycloak JWT Authentication for FastAPI (#43).
Read-Only-Endpoints (GET) bleiben offen. Write-Endpoints (POST) erfordern
ein gültiges Keycloak-JWT. Das Modul cached den JWKS (public keys) für
1 Stunde und validiert Token-Signatur + Expiry + Audience + Issuer.
Wenn Keycloak nicht konfiguriert ist (KEYCLOAK_URL leer), ist Auth
**deaktiviert** alle Endpoints sind offen. Das erlaubt lokale
Entwicklung ohne Keycloak-Server.
Usage in main.py:
from .auth import get_current_user, require_auth
@app.post("/api/analyze-drucksache")
async def analyze(request: Request, user = Depends(require_auth)):
... # user ist ein dict mit sub, email, name, roles
@app.get("/api/auth/me")
async def auth_me(user = Depends(get_current_user)):
... # user ist None wenn nicht eingeloggt, dict wenn eingeloggt
"""
import logging
import time
from typing import Optional
import httpx
from fastapi import Depends, HTTPException, Request
from .config import settings
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# JWKS Cache — lädt die Public Keys vom Keycloak-Server, cached für 1h.
# ─────────────────────────────────────────────────────────────────────────────
_jwks_cache: dict = {}
_jwks_cache_time: float = 0
_JWKS_CACHE_TTL = 3600 # 1h
def _keycloak_issuer() -> str:
return f"{settings.keycloak_url}/realms/{settings.keycloak_realm}"
def _keycloak_jwks_url() -> str:
return f"{_keycloak_issuer()}/protocol/openid-connect/certs"
async def _get_jwks() -> dict:
"""Fetch or return cached JWKS from Keycloak."""
global _jwks_cache, _jwks_cache_time
if _jwks_cache and (time.time() - _jwks_cache_time) < _JWKS_CACHE_TTL:
return _jwks_cache
url = _keycloak_jwks_url()
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url)
if resp.status_code == 200:
_jwks_cache = resp.json()
_jwks_cache_time = time.time()
logger.info("JWKS refreshed from %s (%d keys)", url, len(_jwks_cache.get("keys", [])))
return _jwks_cache
else:
logger.error("JWKS fetch failed: HTTP %s from %s", resp.status_code, url)
except Exception:
logger.exception("JWKS fetch error from %s", url)
return _jwks_cache # Return stale cache if refresh fails
def _is_auth_enabled() -> bool:
"""Auth ist nur aktiv wenn alle drei Keycloak-Settings gesetzt sind."""
return bool(
settings.keycloak_url
and settings.keycloak_realm
and settings.keycloak_client_id
)
# ─────────────────────────────────────────────────────────────────────────────
# Token-Extraktion und Validierung
# ─────────────────────────────────────────────────────────────────────────────
def _extract_token(request: Request) -> Optional[str]:
"""Extrahiere Bearer-Token aus Authorization-Header oder Cookie."""
auth = request.headers.get("authorization", "")
if auth.startswith("Bearer "):
return auth[7:]
# Fallback: Cookie (für Browser-Redirects nach Keycloak-Login)
return request.cookies.get("access_token")
async def _validate_token(token: str) -> Optional[dict]:
"""Validiere JWT gegen Keycloak-JWKS. Returns Payload oder None."""
try:
from jose import jwt, JWTError, ExpiredSignatureError
jwks = await _get_jwks()
if not jwks or "keys" not in jwks:
logger.warning("No JWKS available for token validation")
return None
# Decode Header um den Key-ID (kid) zu finden
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get("kid")
# Finde den passenden Public Key
rsa_key = None
for key in jwks.get("keys", []):
if key.get("kid") == kid:
rsa_key = key
break
if not rsa_key:
logger.warning("JWT kid %s not found in JWKS", kid)
return None
# Keycloak setzt aud="account" für Public Clients, nicht den
# client_id. Prüfe azp (authorized party) statt aud, und
# deaktiviere den strikten aud-Check.
payload = jwt.decode(
token,
rsa_key,
algorithms=["RS256"],
issuer=_keycloak_issuer(),
options={"verify_exp": True, "verify_aud": False},
)
# azp muss unserem Client entsprechen
if payload.get("azp") != settings.keycloak_client_id:
logger.warning("JWT azp %s != expected %s", payload.get("azp"), settings.keycloak_client_id)
return None
return {
"sub": payload.get("sub"),
"email": payload.get("email", ""),
"name": payload.get("preferred_username", payload.get("name", "")),
"roles": payload.get("realm_access", {}).get("roles", []),
}
except ExpiredSignatureError:
logger.debug("JWT expired")
return None
except JWTError as e:
logger.debug("JWT validation failed: %s", e)
return None
except ImportError:
logger.error("python-jose not installed — JWT validation disabled")
return None
# ─────────────────────────────────────────────────────────────────────────────
# FastAPI Dependencies
# ─────────────────────────────────────────────────────────────────────────────
async def get_current_user(request: Request) -> Optional[dict]:
"""Optionale Auth — gibt User-Dict oder None zurück.
Für Endpoints die sowohl mit als auch ohne Login funktionieren
(z.B. UI-Personalisierung, Bookmark-Anzeige).
"""
if not _is_auth_enabled():
return None
token = _extract_token(request)
if not token:
return None
return await _validate_token(token)
async def require_auth(request: Request) -> dict:
"""Pflicht-Auth — gibt User-Dict oder HTTP 401.
Für Write-Endpoints (POST analyze, index).
Wenn Auth nicht konfiguriert ist: ALLE durchlassen (Dev-Modus).
"""
if not _is_auth_enabled():
return {"sub": "anonymous", "email": "", "name": "Dev-Modus", "roles": []}
token = _extract_token(request)
if not token:
raise HTTPException(
status_code=401,
detail="Anmeldung erforderlich",
headers={"WWW-Authenticate": "Bearer"},
)
user = await _validate_token(token)
if not user:
raise HTTPException(
status_code=401,
detail="Token ungültig oder abgelaufen",
headers={"WWW-Authenticate": "Bearer"},
)
return user
# ─────────────────────────────────────────────────────────────────────────────
# Auth-Info-Endpoint
# ─────────────────────────────────────────────────────────────────────────────
def keycloak_login_url(redirect_uri: str) -> str:
"""Baut die Keycloak-Login-URL für den Browser-Redirect."""
if not _is_auth_enabled():
return ""
from urllib.parse import quote
return (
f"{_keycloak_issuer()}/protocol/openid-connect/auth"
f"?client_id={settings.keycloak_client_id}"
f"&redirect_uri={quote(redirect_uri)}"
f"&response_type=code"
f"&scope=openid profile email"
)