"""Keycloak JWT Authentication for FastAPI (#43). Read-Only-Endpoints (GET) bleiben offen. Write-Endpoints (POST) erfordern ein gültiges Keycloak-JWT. Das Modul cached den JWKS (public keys) für 1 Stunde und validiert Token-Signatur + Expiry + Audience + Issuer. Wenn Keycloak nicht konfiguriert ist (KEYCLOAK_URL leer), ist Auth **deaktiviert** — alle Endpoints sind offen. Das erlaubt lokale Entwicklung ohne Keycloak-Server. Usage in main.py: from .auth import get_current_user, require_auth @app.post("/api/analyze-drucksache") async def analyze(request: Request, user = Depends(require_auth)): ... # user ist ein dict mit sub, email, name, roles @app.get("/api/auth/me") async def auth_me(user = Depends(get_current_user)): ... # user ist None wenn nicht eingeloggt, dict wenn eingeloggt """ import logging import time from typing import Optional import httpx from fastapi import Depends, HTTPException, Request from .config import settings logger = logging.getLogger(__name__) # ───────────────────────────────────────────────────────────────────────────── # JWKS Cache — lädt die Public Keys vom Keycloak-Server, cached für 1h. # ───────────────────────────────────────────────────────────────────────────── _jwks_cache: dict = {} _jwks_cache_time: float = 0 _JWKS_CACHE_TTL = 3600 # 1h def _keycloak_issuer() -> str: return f"{settings.keycloak_url}/realms/{settings.keycloak_realm}" def _keycloak_jwks_url() -> str: return f"{_keycloak_issuer()}/protocol/openid-connect/certs" async def _get_jwks() -> dict: """Fetch or return cached JWKS from Keycloak.""" global _jwks_cache, _jwks_cache_time if _jwks_cache and (time.time() - _jwks_cache_time) < _JWKS_CACHE_TTL: return _jwks_cache url = _keycloak_jwks_url() try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get(url) if resp.status_code == 200: _jwks_cache = resp.json() _jwks_cache_time = time.time() logger.info("JWKS refreshed from %s (%d keys)", url, len(_jwks_cache.get("keys", []))) return _jwks_cache else: logger.error("JWKS fetch failed: HTTP %s from %s", resp.status_code, url) except Exception: logger.exception("JWKS fetch error from %s", url) return _jwks_cache # Return stale cache if refresh fails def _is_auth_enabled() -> bool: """Auth ist nur aktiv wenn alle drei Keycloak-Settings gesetzt sind.""" return bool( settings.keycloak_url and settings.keycloak_realm and settings.keycloak_client_id ) # ───────────────────────────────────────────────────────────────────────────── # Token-Extraktion und Validierung # ───────────────────────────────────────────────────────────────────────────── def _extract_token(request: Request) -> Optional[str]: """Extrahiere Bearer-Token aus Authorization-Header oder Cookie.""" auth = request.headers.get("authorization", "") if auth.startswith("Bearer "): return auth[7:] # Fallback: Cookie (für Browser-Redirects nach Keycloak-Login) return request.cookies.get("access_token") async def _validate_token(token: str) -> Optional[dict]: """Validiere JWT gegen Keycloak-JWKS. Returns Payload oder None.""" try: from jose import jwt, JWTError, ExpiredSignatureError jwks = await _get_jwks() if not jwks or "keys" not in jwks: logger.warning("No JWKS available for token validation") return None # Decode Header um den Key-ID (kid) zu finden unverified_header = jwt.get_unverified_header(token) kid = unverified_header.get("kid") # Finde den passenden Public Key rsa_key = None for key in jwks.get("keys", []): if key.get("kid") == kid: rsa_key = key break if not rsa_key: logger.warning("JWT kid %s not found in JWKS", kid) return None # Keycloak setzt aud="account" für Public Clients, nicht den # client_id. Prüfe azp (authorized party) statt aud, und # deaktiviere den strikten aud-Check. payload = jwt.decode( token, rsa_key, algorithms=["RS256"], issuer=_keycloak_issuer(), options={"verify_exp": True, "verify_aud": False}, ) # azp muss unserem Client entsprechen if payload.get("azp") != settings.keycloak_client_id: logger.warning("JWT azp %s != expected %s", payload.get("azp"), settings.keycloak_client_id) return None return { "sub": payload.get("sub"), "email": payload.get("email", ""), "name": payload.get("preferred_username", payload.get("name", "")), "roles": payload.get("realm_access", {}).get("roles", []), } except ExpiredSignatureError: logger.debug("JWT expired") return None except JWTError as e: logger.debug("JWT validation failed: %s", e) return None except ImportError: logger.error("python-jose not installed — JWT validation disabled") return None # ───────────────────────────────────────────────────────────────────────────── # FastAPI Dependencies # ───────────────────────────────────────────────────────────────────────────── async def get_current_user(request: Request) -> Optional[dict]: """Optionale Auth — gibt User-Dict oder None zurück. Für Endpoints die sowohl mit als auch ohne Login funktionieren (z.B. UI-Personalisierung, Bookmark-Anzeige). """ if not _is_auth_enabled(): return None token = _extract_token(request) if not token: return None return await _validate_token(token) async def require_auth(request: Request) -> dict: """Pflicht-Auth — gibt User-Dict oder HTTP 401. Für Write-Endpoints (POST analyze, index). Wenn Auth nicht konfiguriert ist: ALLE durchlassen (Dev-Modus). """ if not _is_auth_enabled(): return {"sub": "anonymous", "email": "", "name": "Dev-Modus", "roles": []} token = _extract_token(request) if not token: raise HTTPException( status_code=401, detail="Anmeldung erforderlich", headers={"WWW-Authenticate": "Bearer"}, ) user = await _validate_token(token) if not user: raise HTTPException( status_code=401, detail="Token ungültig oder abgelaufen", headers={"WWW-Authenticate": "Bearer"}, ) return user async def require_admin(request: Request) -> dict: """Admin-Auth — gibt User-Dict oder HTTP 403. Prüft ob der User die Rolle 'admin' oder 'gwoe-admin' hat. Im Dev-Modus (Auth deaktiviert): durchlassen. Für: Batch-Analyse, Programm-Indexierung, Assessment-Löschung. """ if not _is_auth_enabled(): return {"sub": "anonymous", "email": "", "name": "Dev-Modus", "roles": ["admin"]} user = await require_auth(request) roles = user.get("roles", []) if "admin" in roles or "gwoe-admin" in roles: return user raise HTTPException( status_code=403, detail="Admin-Berechtigung erforderlich", ) return user # ───────────────────────────────────────────────────────────────────────────── # Auth-Info-Endpoint # ───────────────────────────────────────────────────────────────────────────── def keycloak_login_url(redirect_uri: str) -> str: """Baut die Keycloak-Login-URL für den Browser-Redirect.""" if not _is_auth_enabled(): return "" from urllib.parse import quote return ( f"{_keycloak_issuer()}/protocol/openid-connect/auth" f"?client_id={settings.keycloak_client_id}" f"&redirect_uri={quote(redirect_uri)}" f"&response_type=code" f"&scope=openid profile email" )