Queue (queue.py):
- QUEUE_CONCURRENCY ENV (default 3) statt hartcodiert 1
- N Worker-Coroutines via asyncio tasks (nicht Semaphore — jeder
Worker pickt eigenständig von der Queue)
- Per-Job-Tracking: job_id → {status, drucksache, duration, error}
- get_queue_status() liefert jobs-Array für UI-Tabelle
Visualisierung (index.html):
- Fortschrittsbalken (X/Y fertig, grün)
- Job-Tabelle: Drucksache + Status-Icon + Dauer
- Fertige Jobs klickbar → Detail-Ansicht
- Auto-Refresh alle 3s
Admin-Schutz (auth.py + main.py):
- Neue require_admin Dependency: prüft Keycloak-Rolle "admin" oder
"gwoe-admin". Im Dev-Modus durchlassen.
- Batch-Analyse, Programme-Index, Assessment-Delete: require_admin
- Einzelanalyse, Bookmarks, Kommentare: bleiben require_auth
- Keycloak: Rolle "admin" erstellt + User tobias zugewiesen
Tests: 206 passed.
Refs: #99
248 lines
9.0 KiB
Python
248 lines
9.0 KiB
Python
"""Keycloak JWT Authentication for FastAPI (#43).
|
|
|
|
Read-Only-Endpoints (GET) bleiben offen. Write-Endpoints (POST) erfordern
|
|
ein gültiges Keycloak-JWT. Das Modul cached den JWKS (public keys) für
|
|
1 Stunde und validiert Token-Signatur + Expiry + Audience + Issuer.
|
|
|
|
Wenn Keycloak nicht konfiguriert ist (KEYCLOAK_URL leer), ist Auth
|
|
**deaktiviert** — alle Endpoints sind offen. Das erlaubt lokale
|
|
Entwicklung ohne Keycloak-Server.
|
|
|
|
Usage in main.py:
|
|
|
|
from .auth import get_current_user, require_auth
|
|
|
|
@app.post("/api/analyze-drucksache")
|
|
async def analyze(request: Request, user = Depends(require_auth)):
|
|
... # user ist ein dict mit sub, email, name, roles
|
|
|
|
@app.get("/api/auth/me")
|
|
async def auth_me(user = Depends(get_current_user)):
|
|
... # user ist None wenn nicht eingeloggt, dict wenn eingeloggt
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from fastapi import Depends, HTTPException, Request
|
|
|
|
from .config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# JWKS Cache — lädt die Public Keys vom Keycloak-Server, cached für 1h.
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
_jwks_cache: dict = {}
|
|
_jwks_cache_time: float = 0
|
|
_JWKS_CACHE_TTL = 3600 # 1h
|
|
|
|
|
|
def _keycloak_issuer() -> str:
|
|
return f"{settings.keycloak_url}/realms/{settings.keycloak_realm}"
|
|
|
|
|
|
def _keycloak_jwks_url() -> str:
|
|
return f"{_keycloak_issuer()}/protocol/openid-connect/certs"
|
|
|
|
|
|
async def _get_jwks() -> dict:
|
|
"""Fetch or return cached JWKS from Keycloak."""
|
|
global _jwks_cache, _jwks_cache_time
|
|
|
|
if _jwks_cache and (time.time() - _jwks_cache_time) < _JWKS_CACHE_TTL:
|
|
return _jwks_cache
|
|
|
|
url = _keycloak_jwks_url()
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.get(url)
|
|
if resp.status_code == 200:
|
|
_jwks_cache = resp.json()
|
|
_jwks_cache_time = time.time()
|
|
logger.info("JWKS refreshed from %s (%d keys)", url, len(_jwks_cache.get("keys", [])))
|
|
return _jwks_cache
|
|
else:
|
|
logger.error("JWKS fetch failed: HTTP %s from %s", resp.status_code, url)
|
|
except Exception:
|
|
logger.exception("JWKS fetch error from %s", url)
|
|
|
|
return _jwks_cache # Return stale cache if refresh fails
|
|
|
|
|
|
def _is_auth_enabled() -> bool:
|
|
"""Auth ist nur aktiv wenn alle drei Keycloak-Settings gesetzt sind."""
|
|
return bool(
|
|
settings.keycloak_url
|
|
and settings.keycloak_realm
|
|
and settings.keycloak_client_id
|
|
)
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Token-Extraktion und Validierung
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _extract_token(request: Request) -> Optional[str]:
|
|
"""Extrahiere Bearer-Token aus Authorization-Header oder Cookie."""
|
|
auth = request.headers.get("authorization", "")
|
|
if auth.startswith("Bearer "):
|
|
return auth[7:]
|
|
# Fallback: Cookie (für Browser-Redirects nach Keycloak-Login)
|
|
return request.cookies.get("access_token")
|
|
|
|
|
|
async def _validate_token(token: str) -> Optional[dict]:
|
|
"""Validiere JWT gegen Keycloak-JWKS. Returns Payload oder None."""
|
|
try:
|
|
from jose import jwt, JWTError, ExpiredSignatureError
|
|
|
|
jwks = await _get_jwks()
|
|
if not jwks or "keys" not in jwks:
|
|
logger.warning("No JWKS available for token validation")
|
|
return None
|
|
|
|
# Decode Header um den Key-ID (kid) zu finden
|
|
unverified_header = jwt.get_unverified_header(token)
|
|
kid = unverified_header.get("kid")
|
|
|
|
# Finde den passenden Public Key
|
|
rsa_key = None
|
|
for key in jwks.get("keys", []):
|
|
if key.get("kid") == kid:
|
|
rsa_key = key
|
|
break
|
|
|
|
if not rsa_key:
|
|
logger.warning("JWT kid %s not found in JWKS", kid)
|
|
return None
|
|
|
|
# Keycloak setzt aud="account" für Public Clients, nicht den
|
|
# client_id. Prüfe azp (authorized party) statt aud, und
|
|
# deaktiviere den strikten aud-Check.
|
|
payload = jwt.decode(
|
|
token,
|
|
rsa_key,
|
|
algorithms=["RS256"],
|
|
issuer=_keycloak_issuer(),
|
|
options={"verify_exp": True, "verify_aud": False},
|
|
)
|
|
|
|
# azp muss unserem Client entsprechen
|
|
if payload.get("azp") != settings.keycloak_client_id:
|
|
logger.warning("JWT azp %s != expected %s", payload.get("azp"), settings.keycloak_client_id)
|
|
return None
|
|
|
|
return {
|
|
"sub": payload.get("sub"),
|
|
"email": payload.get("email", ""),
|
|
"name": payload.get("preferred_username", payload.get("name", "")),
|
|
"roles": payload.get("realm_access", {}).get("roles", []),
|
|
}
|
|
|
|
except ExpiredSignatureError:
|
|
logger.debug("JWT expired")
|
|
return None
|
|
except JWTError as e:
|
|
logger.debug("JWT validation failed: %s", e)
|
|
return None
|
|
except ImportError:
|
|
logger.error("python-jose not installed — JWT validation disabled")
|
|
return None
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# FastAPI Dependencies
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
async def get_current_user(request: Request) -> Optional[dict]:
|
|
"""Optionale Auth — gibt User-Dict oder None zurück.
|
|
|
|
Für Endpoints die sowohl mit als auch ohne Login funktionieren
|
|
(z.B. UI-Personalisierung, Bookmark-Anzeige).
|
|
"""
|
|
if not _is_auth_enabled():
|
|
return None
|
|
|
|
token = _extract_token(request)
|
|
if not token:
|
|
return None
|
|
|
|
return await _validate_token(token)
|
|
|
|
|
|
async def require_auth(request: Request) -> dict:
|
|
"""Pflicht-Auth — gibt User-Dict oder HTTP 401.
|
|
|
|
Für Write-Endpoints (POST analyze, index).
|
|
Wenn Auth nicht konfiguriert ist: ALLE durchlassen (Dev-Modus).
|
|
"""
|
|
if not _is_auth_enabled():
|
|
return {"sub": "anonymous", "email": "", "name": "Dev-Modus", "roles": []}
|
|
|
|
token = _extract_token(request)
|
|
if not token:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Anmeldung erforderlich",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
user = await _validate_token(token)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Token ungültig oder abgelaufen",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
return user
|
|
|
|
|
|
async def require_admin(request: Request) -> dict:
|
|
"""Admin-Auth — gibt User-Dict oder HTTP 403.
|
|
|
|
Prüft ob der User die Rolle 'admin' oder 'gwoe-admin' hat.
|
|
Im Dev-Modus (Auth deaktiviert): durchlassen.
|
|
Für: Batch-Analyse, Programm-Indexierung, Assessment-Löschung.
|
|
"""
|
|
if not _is_auth_enabled():
|
|
return {"sub": "anonymous", "email": "", "name": "Dev-Modus", "roles": ["admin"]}
|
|
|
|
user = await require_auth(request)
|
|
roles = user.get("roles", [])
|
|
if "admin" in roles or "gwoe-admin" in roles:
|
|
return user
|
|
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Admin-Berechtigung erforderlich",
|
|
)
|
|
|
|
return user
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Auth-Info-Endpoint
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def keycloak_login_url(redirect_uri: str) -> str:
|
|
"""Baut die Keycloak-Login-URL für den Browser-Redirect."""
|
|
if not _is_auth_enabled():
|
|
return ""
|
|
from urllib.parse import quote
|
|
return (
|
|
f"{_keycloak_issuer()}/protocol/openid-connect/auth"
|
|
f"?client_id={settings.keycloak_client_id}"
|
|
f"&redirect_uri={quote(redirect_uri)}"
|
|
f"&response_type=code"
|
|
f"&scope=openid profile email"
|
|
)
|