#43 Keycloak SSO: JWT-Middleware + UI-Guiding

Auth-Schicht vorbereitet — Dev-Modus (KEYCLOAK_URL leer) lässt alles
durch, Prod-Modus (ENV gesetzt) validiert JWT gegen Keycloak-JWKS.

Backend (app/auth.py):
- JWKS-Cache mit 1h TTL (async httpx fetch)
- get_current_user: Optional, gibt User-Dict oder None
- require_auth: Pflicht, gibt User-Dict oder HTTP 401
- keycloak_login_url: Baut die OIDC-Login-URL
- _is_auth_enabled: prüft ob alle 3 ENV-Vars gesetzt sind

Abgesicherte POST-Endpoints:
- POST /analyze → Depends(require_auth)
- POST /api/analyze-drucksache → Depends(require_auth)
- POST /api/programme/index → Depends(require_auth)

Neue Endpoints:
- GET /api/auth/me → {authenticated, sub, email, name, roles} oder {authenticated: false}
- GET /api/auth/login-url → {enabled, url} für Keycloak-Redirect

Frontend (index.html):
- initAuth() beim DOMContentLoaded → prüft /api/auth/me
- "Anmelden"-Button im Header (neben "Quellen")
- "Jetzt prüfen"-Button: disabled + Tooltip "Nur nach Anmeldung
  verfügbar" wenn nicht eingeloggt; aktiv wenn eingeloggt
- currentUser-State steuert Button-Zustände

Dev-Modus: Solange KEYCLOAK_URL nicht gesetzt ist (lokale Dev, aktueller
Prod-Stand), sind alle Endpoints offen wie bisher. Kein Breaking Change.

Dependency: python-jose[cryptography]>=3.3.0 in requirements.txt.

Tests: 194/194 grün (auth.py hat keine Seiteneffekte im Import).

Refs: #43
This commit is contained in:
Dotty Dotter 2026-04-10 14:28:57 +02:00
parent ea9479dc81
commit 7159240f49
4 changed files with 281 additions and 2 deletions

217
app/auth.py Normal file
View File

@ -0,0 +1,217 @@
"""Keycloak JWT Authentication for FastAPI (#43).
Read-Only-Endpoints (GET) bleiben offen. Write-Endpoints (POST) erfordern
ein gültiges Keycloak-JWT. Das Modul cached den JWKS (public keys) für
1 Stunde und validiert Token-Signatur + Expiry + Audience + Issuer.
Wenn Keycloak nicht konfiguriert ist (KEYCLOAK_URL leer), ist Auth
**deaktiviert** alle Endpoints sind offen. Das erlaubt lokale
Entwicklung ohne Keycloak-Server.
Usage in main.py:
from .auth import get_current_user, require_auth
@app.post("/api/analyze-drucksache")
async def analyze(request: Request, user = Depends(require_auth)):
... # user ist ein dict mit sub, email, name, roles
@app.get("/api/auth/me")
async def auth_me(user = Depends(get_current_user)):
... # user ist None wenn nicht eingeloggt, dict wenn eingeloggt
"""
import logging
import time
from typing import Optional
import httpx
from fastapi import Depends, HTTPException, Request
from .config import settings
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# JWKS Cache — lädt die Public Keys vom Keycloak-Server, cached für 1h.
# ─────────────────────────────────────────────────────────────────────────────
_jwks_cache: dict = {}
_jwks_cache_time: float = 0
_JWKS_CACHE_TTL = 3600 # 1h
def _keycloak_issuer() -> str:
return f"{settings.keycloak_url}/realms/{settings.keycloak_realm}"
def _keycloak_jwks_url() -> str:
return f"{_keycloak_issuer()}/protocol/openid-connect/certs"
async def _get_jwks() -> dict:
"""Fetch or return cached JWKS from Keycloak."""
global _jwks_cache, _jwks_cache_time
if _jwks_cache and (time.time() - _jwks_cache_time) < _JWKS_CACHE_TTL:
return _jwks_cache
url = _keycloak_jwks_url()
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url)
if resp.status_code == 200:
_jwks_cache = resp.json()
_jwks_cache_time = time.time()
logger.info("JWKS refreshed from %s (%d keys)", url, len(_jwks_cache.get("keys", [])))
return _jwks_cache
else:
logger.error("JWKS fetch failed: HTTP %s from %s", resp.status_code, url)
except Exception:
logger.exception("JWKS fetch error from %s", url)
return _jwks_cache # Return stale cache if refresh fails
def _is_auth_enabled() -> bool:
"""Auth ist nur aktiv wenn alle drei Keycloak-Settings gesetzt sind."""
return bool(
settings.keycloak_url
and settings.keycloak_realm
and settings.keycloak_client_id
)
# ─────────────────────────────────────────────────────────────────────────────
# Token-Extraktion und Validierung
# ─────────────────────────────────────────────────────────────────────────────
def _extract_token(request: Request) -> Optional[str]:
"""Extrahiere Bearer-Token aus Authorization-Header oder Cookie."""
auth = request.headers.get("authorization", "")
if auth.startswith("Bearer "):
return auth[7:]
# Fallback: Cookie (für Browser-Redirects nach Keycloak-Login)
return request.cookies.get("access_token")
async def _validate_token(token: str) -> Optional[dict]:
"""Validiere JWT gegen Keycloak-JWKS. Returns Payload oder None."""
try:
from jose import jwt, JWTError, ExpiredSignatureError
jwks = await _get_jwks()
if not jwks or "keys" not in jwks:
logger.warning("No JWKS available for token validation")
return None
# Decode Header um den Key-ID (kid) zu finden
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get("kid")
# Finde den passenden Public Key
rsa_key = None
for key in jwks.get("keys", []):
if key.get("kid") == kid:
rsa_key = key
break
if not rsa_key:
logger.warning("JWT kid %s not found in JWKS", kid)
return None
payload = jwt.decode(
token,
rsa_key,
algorithms=["RS256"],
audience=settings.keycloak_client_id,
issuer=_keycloak_issuer(),
options={"verify_exp": True},
)
return {
"sub": payload.get("sub"),
"email": payload.get("email", ""),
"name": payload.get("preferred_username", payload.get("name", "")),
"roles": payload.get("realm_access", {}).get("roles", []),
}
except ExpiredSignatureError:
logger.debug("JWT expired")
return None
except JWTError as e:
logger.debug("JWT validation failed: %s", e)
return None
except ImportError:
logger.error("python-jose not installed — JWT validation disabled")
return None
# ─────────────────────────────────────────────────────────────────────────────
# FastAPI Dependencies
# ─────────────────────────────────────────────────────────────────────────────
async def get_current_user(request: Request) -> Optional[dict]:
"""Optionale Auth — gibt User-Dict oder None zurück.
Für Endpoints die sowohl mit als auch ohne Login funktionieren
(z.B. UI-Personalisierung, Bookmark-Anzeige).
"""
if not _is_auth_enabled():
return None
token = _extract_token(request)
if not token:
return None
return await _validate_token(token)
async def require_auth(request: Request) -> dict:
"""Pflicht-Auth — gibt User-Dict oder HTTP 401.
Für Write-Endpoints (POST analyze, index).
Wenn Auth nicht konfiguriert ist: ALLE durchlassen (Dev-Modus).
"""
if not _is_auth_enabled():
return {"sub": "anonymous", "email": "", "name": "Dev-Modus", "roles": []}
token = _extract_token(request)
if not token:
raise HTTPException(
status_code=401,
detail="Anmeldung erforderlich",
headers={"WWW-Authenticate": "Bearer"},
)
user = await _validate_token(token)
if not user:
raise HTTPException(
status_code=401,
detail="Token ungültig oder abgelaufen",
headers={"WWW-Authenticate": "Bearer"},
)
return user
# ─────────────────────────────────────────────────────────────────────────────
# Auth-Info-Endpoint
# ─────────────────────────────────────────────────────────────────────────────
def keycloak_login_url(redirect_uri: str) -> str:
"""Baut die Keycloak-Login-URL für den Browser-Redirect."""
if not _is_auth_enabled():
return ""
from urllib.parse import quote
return (
f"{_keycloak_issuer()}/protocol/openid-connect/auth"
f"?client_id={settings.keycloak_client_id}"
f"&redirect_uri={quote(redirect_uri)}"
f"&response_type=code"
f"&scope=openid profile email"
)

View File

@ -5,7 +5,7 @@ import uuid
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, File, Form, UploadFile, Request, BackgroundTasks, HTTPException
from fastapi import FastAPI, File, Form, UploadFile, Request, BackgroundTasks, HTTPException, Depends
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, Response
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi.staticfiles import StaticFiles
@ -39,6 +39,7 @@ from .database import (
from .parlamente import get_adapter, ADAPTERS
from .bundeslaender import alle_bundeslaender
from .analyzer import analyze_antrag
from .auth import get_current_user, require_auth, keycloak_login_url, _is_auth_enabled
from .report import generate_html_report, generate_pdf_report
from .embeddings import (
init_embeddings_db, get_programme_info, get_indexing_status,
@ -138,6 +139,7 @@ async def index(request: Request):
@limiter.limit("10/minute")
async def start_analysis(
request: Request,
user=Depends(require_auth),
background_tasks: BackgroundTasks,
text: Optional[str] = Form(None),
file: Optional[UploadFile] = File(None),
@ -243,6 +245,31 @@ async def get_pdf(job_id: str):
raise HTTPException(status_code=500, detail="PDF nicht gefunden")
# ─── Auth-Endpoints (#43) ───────────────────────────────────────────────────
@app.get("/api/auth/me")
async def auth_me(user=Depends(get_current_user)):
"""User-Info oder null wenn nicht eingeloggt.
Das Frontend ruft diesen Endpoint beim Load auf, um zu entscheiden
ob "Bewerten" aktiv oder ausgegraut ist.
"""
if user:
return {"authenticated": True, **user}
return {"authenticated": False}
@app.get("/api/auth/login-url")
async def auth_login_url(request: Request, redirect: str = "/"):
"""Keycloak-Login-URL für den Browser-Redirect."""
if not _is_auth_enabled():
return {"enabled": False, "url": ""}
# Construct absolute redirect URI
base = str(request.base_url).rstrip("/")
url = keycloak_login_url(f"{base}{redirect}")
return {"enabled": True, "url": url}
# API: Load assessments from database
@app.get("/api/assessments")
async def list_assessments(bundesland: Optional[str] = None):
@ -451,6 +478,7 @@ async def search_landtag(
async def analyze_drucksache(
request: Request,
background_tasks: BackgroundTasks,
user=Depends(require_auth),
drucksache: str = Form(...),
bundesland: str = Form("NRW"),
model: str = Form("qwen-plus")
@ -720,6 +748,7 @@ async def programme_status():
async def index_programme(
request: Request,
background_tasks: BackgroundTasks,
user=Depends(require_auth),
programm_id: str = Form(None),
all_programmes: bool = Form(False),
):

View File

@ -700,6 +700,7 @@
<button class="mode-btn" onclick="showMode('tags')">🏷️ Tags</button>
<button class="mode-btn" onclick="showMode('upload')">📤 Prüfen</button>
<a href="/quellen" class="mode-btn" style="text-decoration: none;">📚 Quellen</a>
<button id="auth-btn" class="mode-btn" style="border:none;cursor:pointer;font-size:0.85rem;">🔑 Anmelden</button>
<a href="/auswertungen" class="mode-btn" style="text-decoration: none;">📈 Auswertungen</a>
</div>
</header>
@ -850,6 +851,34 @@
let isSearching = false;
let selectedTags = new Set();
let allTags = {};
let currentUser = null; // #43: Auth-State
// #43: Auth prüfen beim Load. Steuert ob "Jetzt prüfen" aktiv ist.
async function initAuth() {
try {
const resp = await fetch('/api/auth/me');
const data = await resp.json();
currentUser = data.authenticated ? data : null;
} catch { currentUser = null; }
updateAuthUI();
}
function updateAuthUI() {
const authBtn = document.getElementById('auth-btn');
if (!authBtn) return;
if (currentUser) {
authBtn.textContent = currentUser.name || currentUser.email || 'Angemeldet';
authBtn.classList.add('logged-in');
authBtn.onclick = () => { /* TODO: Logout */ };
} else {
authBtn.textContent = '🔑 Anmelden';
authBtn.classList.remove('logged-in');
authBtn.onclick = async () => {
const resp = await fetch(`/api/auth/login-url?redirect=${encodeURIComponent(window.location.pathname)}`);
const data = await resp.json();
if (data.url) window.location.href = data.url;
};
}
}
// Map code → parlament_name, vom Backend mit dem Initial-Render geliefert.
// Wird im Detail-Header und im Listen-Item-Badge-Tooltip verwendet.
@ -857,6 +886,7 @@
// Load assessments on page load — localStorage-Auswahl wiederherstellen
document.addEventListener('DOMContentLoaded', () => {
initAuth(); // #43: Auth-State prüfen
const saved = localStorage.getItem('selectedBundesland');
const select = document.getElementById('bundesland-select');
if (saved) {
@ -1120,7 +1150,9 @@
<div class="list-item-title">${item.title || 'Ohne Titel'}</div>
<div class="list-item-meta">${fraktionen} · ${item.datum || ''}</div>
${isUnchecked ? `
<button class="btn-check-now" onclick="event.stopPropagation(); checkNow('${item.drucksache}', this)">
<button class="btn-check-now"
${currentUser ? '' : 'disabled title="Nur nach Anmeldung verfügbar" style="opacity:0.5;cursor:not-allowed;"'}
onclick="event.stopPropagation(); checkNow('${item.drucksache}', this)">
🔍 Jetzt prüfen
</button>
` : `

View File

@ -12,3 +12,4 @@ weasyprint>=62.0
pydantic>=2.9.0
pydantic-settings>=2.5.0
slowapi>=0.1.9
python-jose[cryptography]>=3.3.0