"""GWÖ-Antragsprüfer — FastAPI Webapp.""" import logging import uuid from pathlib import Path from typing import Optional from fastapi import FastAPI, File, Form, UploadFile, Request, BackgroundTasks, HTTPException, Depends from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, Response from starlette.middleware.base import BaseHTTPMiddleware from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from .validators import ( MAX_SEARCH_QUERY_LEN, validate_drucksache, validate_search_query, ) # Strukturiertes Logging für die ganze App. uvicorn registriert seinen # eigenen Root-Handler erst beim Start; wir setzen ein neutrales Format # für unsere Module früh, damit logger.exception() auch beim Boot greift. logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s: %(message)s", ) logger = logging.getLogger(__name__) from .config import settings from .database import ( init_db, get_job, create_job, update_job, get_all_assessments, get_assessment, delete_assessment, upsert_assessment, import_json_assessments, search_assessments, toggle_bookmark, get_bookmarks, add_comment, get_comments, delete_comment, ) from .parlamente import get_adapter, ADAPTERS from .bundeslaender import alle_bundeslaender from .analyzer import analyze_antrag from .auth import get_current_user, require_auth, require_admin, keycloak_login_url, _is_auth_enabled def _pick_best_title(llm_title: str, doc_title: Optional[str], drucksache: str) -> str: """Wähle den besten Titel aus LLM-Output und Adapter-Metadata. Priorität: 1. doc_title, wenn ein echter Titel (nicht "Drucksache XX") 2. llm_title, wenn nicht leer und nicht generisch 3. Generischer Fallback "Drucksache XX" """ generic_prefix = f"Drucksache {drucksache.split('/')[0]}" # doc_title gut? (nicht generisch, nicht leer) if doc_title and not doc_title.startswith("Drucksache ") and len(doc_title) > 5: return doc_title # LLM-Titel gut? (nicht generisch) if llm_title and not llm_title.startswith("Drucksache ") and len(llm_title) > 5: return llm_title # doc_title als Fallback (auch wenn generisch) return doc_title or llm_title or f"Drucksache {drucksache}" from .report import generate_html_report, generate_pdf_report from .embeddings import ( init_embeddings_db, get_programme_info, get_indexing_status, index_programm, render_highlighted_page, PROGRAMME, ) app = FastAPI( title=settings.app_name, version=settings.app_version, docs_url=None, # Disable /docs in production redoc_url=None, # Disable /redoc in production openapi_url=None, # Disable /openapi.json in production ) # Rate-Limiter — fängt Resource-Exhaustion auf den teuren POST-Endpoints # (LLM-Calls + Indexing). Issue #57 Befund #1 (HIGH). Default in-memory # Storage; für mehrere Worker müsste man auf Redis umstellen, solange wir # auf einem Container laufen reicht das Default-Storage. limiter = Limiter(key_func=get_remote_address, default_limits=[]) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # Security Headers Middleware class SecurityHeadersMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): response = await call_next(request) response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()" # CSP: Allow self, inline styles (for templates), and PDF viewer response.headers["Content-Security-Policy"] = ( "default-src 'self'; " "style-src 'self' 'unsafe-inline'; " "script-src 'self' 'unsafe-inline'; " "img-src 'self' data:; " "frame-ancestors 'none';" ) return response app.add_middleware(SecurityHeadersMiddleware) # Setup directories settings.data_dir.mkdir(exist_ok=True) settings.reports_dir.mkdir(exist_ok=True) # Static files and templates static_dir = Path(__file__).parent / "static" templates_dir = Path(__file__).parent / "templates" static_dir.mkdir(exist_ok=True) templates_dir.mkdir(exist_ok=True) app.mount("/static", StaticFiles(directory=static_dir), name="static") templates = Jinja2Templates(directory=str(templates_dir)) @app.on_event("startup") async def startup(): await init_db() init_embeddings_db() # Job-Queue Worker starten (#95) from .queue import start_worker, re_enqueue_pending await re_enqueue_pending() start_worker() @app.on_event("shutdown") async def shutdown(): """Graceful Shutdown: warte auf laufende Queue-Jobs bevor der Container stirbt.""" from .queue import graceful_shutdown await graceful_shutdown(timeout=900) # 15 min — passend zu stop_grace_period # JSON import disabled - all assessments now live in SQLite DB only # Legacy import would overwrite new v5 assessments with old format # count = await import_json_assessments(settings.data_dir / "assessments") # if count > 0: # print(f"Imported {count} assessments from JSON files") @app.get("/", response_class=HTMLResponse) async def index(request: Request): """Landing page with upload form.""" # Frontend-Liste: synthetischer "ALL"-Eintrag (Bundesweit) zuerst, dann # die echten Bundesländer aus der Konfig. Der "ALL"-Code ist eine reine # Frontend/API-Konvention, kein Eintrag in bundeslaender.py. bl_list = [{"code": "ALL", "name": "🌍 Bundesweit", "active": True}] bl_list.extend( {"code": bl.code, "name": bl.name, "active": bl.aktiv} for bl in alle_bundeslaender() ) # Map code → parlament_name, damit das Frontend ohne extra Backend-Call # für jeden Antrag den Parlamentsnamen anzeigen kann. parlament_names = { bl.code: bl.parlament_name for bl in alle_bundeslaender() } from .models import MATRIX_LABELS return templates.TemplateResponse("index.html", { "request": request, "app_name": settings.app_name, "bundeslaender": bl_list, "parlament_names": parlament_names, "matrix_labels": MATRIX_LABELS, "matrix_explanations": { "A1": "Wenn Ihre Stadt Büromöbel kauft: Wurden die unter menschenwürdigen Bedingungen hergestellt? Oder in einer Fabrik, in der Arbeiter:innen ausgebeutet werden? Hier geht es darum, ob die öffentliche Hand beim Einkauf auf Menschenrechte achtet.", "A2": "Beauftragt die Stadt den Handwerksbetrieb aus dem Ort — oder den billigsten Konzern aus dem Ausland? Bleibt das Geld in der Region und schafft Arbeitsplätze vor Ort?", "A3": "Werden bei öffentlichen Aufträgen Klimastandards verlangt? Kommt das Schulessen von regionalen Bauernhöfen oder wird es quer durch Europa gekarrt?", "A4": "Verdienen die Reinigungskräfte im Rathaus einen fairen Lohn? Haben Subunternehmer die gleichen Arbeitsbedingungen wie Festangestellte?", "A5": "Können Sie als Bürger:in nachschauen, welche Firma den Auftrag für den Straßenbau bekommen hat — und warum? Oder passiert das alles hinter verschlossenen Türen?", "B1": "Liegt das Geld Ihrer Stadt bei einer Bank, die auch Waffengeschäfte finanziert? Oder bei einer ethischen Bank, die in soziale Projekte investiert?", "B2": "Fließen Ihre Steuergelder in einen neuen Radweg für alle — oder in eine Umgehungsstraße, die nur dem Gewerbegebiet nützt?", "B3": "Investiert Ihre Kommune in Solaranlagen auf Schuldächern? Oder wird das Geld in klimaschädliche Projekte gesteckt?", "B4": "Bekommen ärmere Stadtteile genauso viel Geld für Spielplätze und Schulen wie reiche? Oder konzentrieren sich die Investitionen dort, wo die Grundstückspreise schon hoch sind?", "B5": "Gibt es einen Bürgerhaushalt, bei dem Sie mitbestimmen können, ob das Geld in die Bibliothek oder den Sportplatz fließt? Oder entscheidet das der Stadtrat allein?", "C1": "Werden in Ihrer Stadtverwaltung Frauen gleich bezahlt? Haben Menschen mit Behinderung faire Chancen auf eine Stelle? Gibt es Schutz vor Mobbing?", "C2": "Hat Ihre Stadt ein Klimaschutzkonzept, das alle Ämter gemeinsam umsetzen? Oder kocht jedes Amt sein eigenes Süppchen?", "C3": "Fahren die Mitarbeiter:innen des Rathauses mit dem Dienstrad oder dem SUV? Gibt es vegetarisches Essen in der Kantine?", "C4": "Können Eltern in der Verwaltung Teilzeit arbeiten, ohne Karrierenachteile? Gibt es flexible Arbeitszeiten für pflegende Angehörige?", "C5": "Können Sie die Sitzungsprotokolle des Stadtrats online lesen? Verstehen Sie, warum Entscheidungen so und nicht anders gefallen sind?", "D1": "Werden Sie auf dem Amt fair behandelt — egal ob Sie einen deutschen oder ausländischen Namen haben? Schützt die Polizei alle gleich?", "D2": "Profitiert die ganze Stadt von dem Antrag — oder nur ein Stadtteil, eine Altersgruppe, eine Einkommensschicht?", "D3": "Kommt der Strom für die Straßenbeleuchtung aus Erneuerbaren? Wird das Regenwasser im Park versickert statt in die Kanalisation geleitet?", "D4": "Kann sich die alleinerziehende Mutter den Kitaplatz leisten? Bekommt der Rentner noch einen Arzttermin? Findet die Familie mit drei Kindern eine bezahlbare Wohnung?", "D5": "Werden Sie gefragt, bevor die Straße vor Ihrem Haus umgebaut wird? Gibt es Bürgerversammlungen, Online-Beteiligung, Jugendparlamente?", "E1": "Hinterlassen wir unseren Enkeln einen Schuldenberg und versiegelte Flächen? Oder investieren wir heute so, dass auch 2050 noch gute Lebensbedingungen herrschen?", "E2": "Hilft der Antrag nur Ihrer Stadt — oder auch den Nachbargemeinden? Gibt es regionale Kooperationen, von denen alle profitieren?", "E3": "Denkt Ihre Kommune beim Einkauf auch an den CO₂-Fußabdruck? An die Abholzung von Regenwäldern für billiges Papier? An den Wasserverbrauch in Dürregebieten?", "E4": "Unterstützt Ihre Stadt strukturschwache Regionen? Gibt es Partnerschaften mit Kommunen im globalen Süden?", "E5": "Setzt sich Ihre Kommune für mehr Demokratie ein — auch auf Landes- und Bundesebene? Werden internationale Abkommen unterstützt?", }, }) @app.post("/analyze") @limiter.limit("10/minute") async def start_analysis( request: Request, background_tasks: BackgroundTasks, text: Optional[str] = Form(None), file: Optional[UploadFile] = File(None), bundesland: str = Form("NRW"), model: str = Form("qwen-plus"), user: dict = Depends(require_auth), ): """Start analysis job.""" if not text and not file: raise HTTPException(status_code=400, detail="Entweder Text oder PDF-Datei erforderlich") # Extract text from PDF if uploaded if file and file.filename: import fitz # PyMuPDF pdf_bytes = await file.read() doc = fitz.open(stream=pdf_bytes, filetype="pdf") text = "" for page in doc: text += page.get_text() doc.close() # Create job job_id = str(uuid.uuid4()) await create_job(job_id, text[:500], bundesland, model) # Start background analysis background_tasks.add_task(run_analysis, job_id, text, bundesland, model) return JSONResponse({"job_id": job_id, "status": "queued"}) async def run_analysis(job_id: str, text: str, bundesland: str, model: str): """Background task for analysis.""" try: await update_job(job_id, status="processing") # Run LLM analysis assessment = await analyze_antrag(text, bundesland, model) # Generate reports html_path = settings.reports_dir / f"{job_id}.html" pdf_path = settings.reports_dir / f"{job_id}.pdf" await generate_html_report(assessment, html_path, bundesland=bundesland) await generate_pdf_report(assessment, pdf_path, bundesland=bundesland) await update_job( job_id, status="completed", result=assessment.model_dump_json(), html_path=str(html_path), pdf_path=str(pdf_path), ) except Exception as e: await update_job(job_id, status="failed", error=str(e)) @app.get("/status/{job_id}") async def get_status(job_id: str): """Get job status.""" job = await get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job nicht gefunden") return JSONResponse({ "job_id": job_id, "status": job["status"], "created_at": job["created_at"], }) @app.get("/result/{job_id}", response_class=HTMLResponse) async def get_result(request: Request, job_id: str): """Get analysis result as HTML.""" job = await get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job nicht gefunden") if job["status"] != "completed": raise HTTPException(status_code=400, detail=f"Job noch nicht fertig: {job['status']}") html_path = Path(job["html_path"]) if html_path.exists(): return HTMLResponse(html_path.read_text()) raise HTTPException(status_code=500, detail="Report nicht gefunden") @app.get("/result/{job_id}/pdf") async def get_pdf(job_id: str): """Download PDF report.""" job = await get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job nicht gefunden") if job["status"] != "completed": raise HTTPException(status_code=400, detail=f"Job noch nicht fertig: {job['status']}") pdf_path = Path(job["pdf_path"]) if pdf_path.exists(): return FileResponse( pdf_path, media_type="application/pdf", filename=f"gwoe-bericht-{job_id[:8]}.pdf" ) raise HTTPException(status_code=500, detail="PDF nicht gefunden") # ─── Queue-Status (#95) ───────────────────────────────────────────────────── @app.get("/api/queue/status") async def queue_status(): """Aktueller Queue-Stand: wartende Jobs, geschätzte Wartezeit.""" from .queue import get_queue_status return get_queue_status() # ─── Auth-Endpoints (#43) ─────────────────────────────────────────────────── @app.get("/api/auth/me") async def auth_me(user=Depends(get_current_user)): """User-Info oder null wenn nicht eingeloggt. Das Frontend ruft diesen Endpoint beim Load auf, um zu entscheiden ob "Bewerten" aktiv oder ausgegraut ist. """ if user: return {"authenticated": True, **user} return {"authenticated": False} @app.get("/api/auth/callback") async def auth_callback(request: Request, code: str = "", state: str = ""): """OIDC Authorization Code → Access Token Exchange. Keycloak redirects hierher nach Login mit ?code=... Parameter. Wir tauschen den Code gegen ein Access Token und setzen es als Cookie. """ if not _is_auth_enabled() or not code: from fastapi.responses import RedirectResponse return RedirectResponse("/") from .auth import _keycloak_issuer token_url = f"{_keycloak_issuer()}/protocol/openid-connect/token" # Construct the same redirect_uri used for the auth request base = str(request.base_url).rstrip("/").replace("http://", "https://") redirect_uri = f"{base}/api/auth/callback" import httpx as _httpx async with _httpx.AsyncClient(timeout=10) as client: resp = await client.post(token_url, data={ "grant_type": "authorization_code", "client_id": settings.keycloak_client_id, "code": code, "redirect_uri": redirect_uri, }) if resp.status_code != 200: logger.error("Token exchange failed: %s %s", resp.status_code, resp.text[:200]) raise HTTPException(status_code=401, detail="Login fehlgeschlagen") tokens = resp.json() access_token = tokens.get("access_token", "") expires_in = tokens.get("expires_in", 3600) # HTML-Response statt RedirectResponse: setzt Cookie UND redirected. # RedirectResponse mit Set-Cookie wird von manchen Browsern bei # 307/302 ignoriert (insb. hinter Reverse-Proxies). return HTMLResponse( f"""

Anmeldung erfolgreich, Weiterleitung...

""", headers={ "Set-Cookie": ( f"access_token={access_token}; Path=/; Secure; HttpOnly; " f"SameSite=Lax; Max-Age={expires_in}" ) }, ) @app.get("/api/auth/login-url") async def auth_login_url(request: Request, redirect: str = "/"): """Keycloak-Login-URL für den Browser-Redirect.""" if not _is_auth_enabled(): return {"enabled": False, "url": ""} # redirect_uri muss auf den Callback-Endpoint zeigen, nicht auf die # Zielseite — der Callback tauscht den Code gegen ein Token. base = str(request.base_url).rstrip("/").replace("http://", "https://") url = keycloak_login_url(f"{base}/api/auth/callback") return {"enabled": True, "url": url} # ─── Bookmarks + Comments (#94) ───────────────────────────────────────────── @app.post("/api/bookmark") async def bookmark_toggle( drucksache: str = Form(...), user: dict = Depends(require_auth), ): """Toggle bookmark für einen Antrag.""" is_bookmarked = await toggle_bookmark(user["sub"], drucksache) return {"bookmarked": is_bookmarked, "drucksache": drucksache} @app.get("/api/bookmarks") async def bookmarks_list(user=Depends(get_current_user)): """Liste aller Bookmarks des aktuellen Users.""" if not user: return [] return await get_bookmarks(user["sub"]) @app.post("/api/comment") async def comment_add( drucksache: str = Form(...), text: str = Form(...), visibility: str = Form("all"), user: dict = Depends(require_auth), ): """Kommentar hinzufügen.""" if len(text) > 2000: raise HTTPException(status_code=400, detail="Kommentar zu lang (max 2000 Zeichen)") return await add_comment(user["sub"], user.get("name", ""), drucksache, text, visibility) @app.get("/api/comments") async def comments_list(drucksache: str, user=Depends(get_current_user)): """Kommentare für einen Antrag.""" user_id = user["sub"] if user else None return await get_comments(drucksache, user_id) @app.delete("/api/comment/{comment_id}") async def comment_delete(comment_id: int, user: dict = Depends(require_auth)): """Eigenen Kommentar löschen.""" deleted = await delete_comment(comment_id, user["sub"]) if not deleted: raise HTTPException(status_code=404, detail="Kommentar nicht gefunden oder nicht Ihr Kommentar") return {"status": "deleted"} # ─── Registrierung (#103) ──────────────────────────────────────────────── @app.post("/api/auth/register") async def auth_register( request: Request, firstName: str = Form(...), lastName: str = Form(...), email: str = Form(...), username: str = Form(...), password: str = Form(...), ): """Registrierung: erstellt User in Keycloak mit enabled=false. Admin muss den Account manuell freischalten.""" if len(password) < 8: raise HTTPException(status_code=400, detail="Passwort muss mindestens 8 Zeichen haben") import httpx as _httpx # Admin-Token holen async with _httpx.AsyncClient(timeout=10) as client: token_resp = await client.post( "https://sso.toppyr.de/realms/master/protocol/openid-connect/token", data={ "grant_type": "password", "client_id": "admin-cli", "username": "admin", "password": "J915vI2Ankf7SdmEqe0BC5Aq", }, ) if token_resp.status_code != 200: raise HTTPException(status_code=500, detail="Keycloak-Verbindung fehlgeschlagen") admin_token = token_resp.json().get("access_token") # User anlegen (disabled) create_resp = await client.post( "https://sso.toppyr.de/admin/realms/collaboration/users", headers={"Authorization": f"Bearer {admin_token}", "Content-Type": "application/json"}, json={ "username": username, "email": email, "firstName": firstName, "lastName": lastName, "enabled": False, "credentials": [{"type": "password", "value": password, "temporary": False}], }, ) if create_resp.status_code == 409: raise HTTPException(status_code=409, detail="Benutzername oder E-Mail bereits vergeben") if create_resp.status_code != 201: raise HTTPException(status_code=500, detail="Registrierung fehlgeschlagen") return {"status": "pending_approval", "message": "Registrierung eingegangen. Ein Administrator wird Ihren Account freischalten."} @app.get("/api/auth/pending-users") async def auth_pending_users(user: dict = Depends(require_admin)): """Liste nicht-freigeschalteter User (Admin-only).""" import httpx as _httpx async with _httpx.AsyncClient(timeout=10) as client: token_resp = await client.post( "https://sso.toppyr.de/realms/master/protocol/openid-connect/token", data={"grant_type": "password", "client_id": "admin-cli", "username": "admin", "password": "J915vI2Ankf7SdmEqe0BC5Aq"}, ) admin_token = token_resp.json().get("access_token") resp = await client.get( "https://sso.toppyr.de/admin/realms/collaboration/users?enabled=false&max=50", headers={"Authorization": f"Bearer {admin_token}"}, ) users = resp.json() if resp.status_code == 200 else [] return [{"id": u["id"], "username": u.get("username"), "firstName": u.get("firstName"), "lastName": u.get("lastName"), "email": u.get("email"), "created": u.get("createdTimestamp")} for u in users] @app.post("/api/auth/approve-user") async def auth_approve_user( user_id: str = Form(...), user: dict = Depends(require_admin), ): """User freischalten (Admin-only).""" import httpx as _httpx async with _httpx.AsyncClient(timeout=10) as client: token_resp = await client.post( "https://sso.toppyr.de/realms/master/protocol/openid-connect/token", data={"grant_type": "password", "client_id": "admin-cli", "username": "admin", "password": "J915vI2Ankf7SdmEqe0BC5Aq"}, ) admin_token = token_resp.json().get("access_token") resp = await client.put( f"https://sso.toppyr.de/admin/realms/collaboration/users/{user_id}", headers={"Authorization": f"Bearer {admin_token}", "Content-Type": "application/json"}, json={"enabled": True}, ) if resp.status_code == 204: return {"status": "approved", "user_id": user_id} raise HTTPException(status_code=500, detail="Freischaltung fehlgeschlagen") # API: Load assessments from database @app.get("/api/assessments") async def list_assessments(bundesland: Optional[str] = None): """Return assessments from database, optionally filtered by Bundesland. ``bundesland="ALL"`` and missing parameter both mean "no filter". """ rows = await get_all_assessments(bundesland) # Convert DB format to frontend format assessments = [] for row in rows: assessments.append({ "drucksache": row.get("drucksache"), "title": row.get("title"), "fraktionen": row.get("fraktionen", []), "datum": row.get("datum"), "link": row.get("link"), "bundesland": row.get("bundesland"), "gwoeScore": row.get("gwoe_score"), "gwoeBegründung": row.get("gwoe_begruendung"), "gwoeMatrix": row.get("gwoe_matrix", []), "gwoeSchwerpunkt": row.get("gwoe_schwerpunkt", []), "wahlprogrammScores": row.get("wahlprogramm_scores", []), "verbesserungen": row.get("verbesserungen", []), "stärken": row.get("staerken", []), "schwächen": row.get("schwaechen", []), "empfehlung": row.get("empfehlung"), "empfehlungSymbol": row.get("empfehlung_symbol"), "verbesserungspotenzial": row.get("verbesserungspotenzial"), "themen": row.get("themen", []), "antragZusammenfassung": row.get("antrag_zusammenfassung"), "antragKernpunkte": row.get("antrag_kernpunkte", []), "updatedAt": row.get("updated_at"), "source": row.get("source"), "model": row.get("model"), }) return assessments # API: Get single assessment (use query param for drucksache with /) @app.get("/api/assessment") async def get_single_assessment(drucksache: str): """Get a single assessment by drucksache ID.""" drucksache = validate_drucksache(drucksache) row = await get_assessment(drucksache) if not row: raise HTTPException(status_code=404, detail="Assessment nicht gefunden") return { "drucksache": row.get("drucksache"), "title": row.get("title"), "fraktionen": row.get("fraktionen", []), "datum": row.get("datum"), "link": row.get("link"), "bundesland": row.get("bundesland"), "gwoeScore": row.get("gwoe_score"), "gwoeBegründung": row.get("gwoe_begruendung"), "gwoeMatrix": row.get("gwoe_matrix", []), "gwoeSchwerpunkt": row.get("gwoe_schwerpunkt", []), "wahlprogrammScores": row.get("wahlprogramm_scores", []), "verbesserungen": row.get("verbesserungen", []), "stärken": row.get("staerken", []), "schwächen": row.get("schwaechen", []), "empfehlung": row.get("empfehlung"), "empfehlungSymbol": row.get("empfehlung_symbol"), "verbesserungspotenzial": row.get("verbesserungspotenzial"), "themen": row.get("themen", []), "antragZusammenfassung": row.get("antrag_zusammenfassung"), "antragKernpunkte": row.get("antrag_kernpunkte", []), "updatedAt": row.get("updated_at"), "source": row.get("source"), "model": row.get("model"), } # API: Delete assessment for re-analysis (#97) @app.delete("/api/assessment/delete") async def delete_assessment_endpoint( drucksache: str, user: dict = Depends(require_admin), ): """Löscht ein Assessment, damit es neu analysiert werden kann.""" drucksache = validate_drucksache(drucksache) deleted = await delete_assessment(drucksache) if not deleted: raise HTTPException(status_code=404, detail="Assessment nicht gefunden") return {"status": "deleted", "drucksache": drucksache} # API: Generate PDF on demand for an assessment @app.get("/api/assessment/pdf") async def download_assessment_pdf(drucksache: str): """Generate and download PDF for an assessment.""" from .models import Assessment drucksache = validate_drucksache(drucksache) row = await get_assessment(drucksache) if not row: raise HTTPException(status_code=404, detail="Assessment nicht gefunden") # Check if PDF already exists safe_name = drucksache.replace("/", "-") pdf_path = settings.reports_dir / f"{safe_name}.pdf" if not pdf_path.exists(): # Convert DB row to Assessment model for report generation assessment_data = { "drucksache": row.get("drucksache"), "title": row.get("title"), "fraktionen": row.get("fraktionen", []), "datum": row.get("datum"), "link": row.get("link"), "gwoe_score": row.get("gwoe_score") or 0, "gwoe_begruendung": row.get("gwoe_begruendung") or "", "gwoe_matrix": row.get("gwoe_matrix", []), "gwoe_schwerpunkt": row.get("gwoe_schwerpunkt", []), "wahlprogramm_scores": row.get("wahlprogramm_scores", []), "verbesserungen": row.get("verbesserungen", []), "staerken": row.get("staerken", []), "schwaechen": row.get("schwaechen", []), "empfehlung": row.get("empfehlung") or "", "empfehlung_symbol": row.get("empfehlung_symbol") or "", "verbesserungspotenzial": row.get("verbesserungspotenzial") or "", "themen": row.get("themen", []), "antrag_zusammenfassung": row.get("antrag_zusammenfassung") or "", "antrag_kernpunkte": row.get("antrag_kernpunkte", []), } try: assessment = Assessment(**assessment_data) await generate_pdf_report( assessment, pdf_path, bundesland=row.get("bundesland"), ) except Exception as e: raise HTTPException(status_code=500, detail=f"PDF-Generierung fehlgeschlagen: {e}") return FileResponse( pdf_path, media_type="application/pdf", filename=f"gwoe-{safe_name}.pdf" ) # API: Search internal DB only @app.get("/api/search") async def search_internal( q: str, bundesland: str = "NRW", limit: int = 50 ): """ Search internal assessments database only. """ q = validate_search_query(q) db_results = await search_assessments(q, bundesland, limit) results = [] for row in db_results: results.append({ "drucksache": row.get("drucksache"), "title": row.get("title"), "fraktionen": row.get("fraktionen", []), "datum": row.get("datum"), "link": row.get("link"), "bundesland": bundesland, "gwoeScore": row.get("gwoe_score"), "themen": row.get("themen", []), "status": "checked", }) return results # API: Search external parliament portal (Landtag) @app.get("/api/search-landtag") async def search_landtag( q: str, bundesland: str = "NRW", limit: int = 20 ): """ Search external parliament portal (e.g., NRW OPAL). Returns results that can be analyzed with "Jetzt prüfen". Requires a concrete Bundesland — the special "ALL" / Bundesweit mode cannot pick a single Landtag adapter and is rejected with HTTP 400. """ q = validate_search_query(q) if not bundesland or bundesland == "ALL": raise HTTPException( status_code=400, detail="Landtag-Suche benötigt ein konkretes Bundesland", ) adapter = get_adapter(bundesland) if not adapter: return {"error": f"Bundesland {bundesland} noch nicht unterstützt"} try: external = await adapter.search(q, limit) results = [] for doc in external: results.append({ "drucksache": doc.drucksache, "title": doc.title, "fraktionen": doc.fraktionen, "datum": doc.datum, "link": doc.link, "bundesland": bundesland, "typ": doc.typ, "gwoeScore": None, "status": "unchecked", }) return results except Exception as e: logger.exception("Landtag search error for q=%r bundesland=%s", q, bundesland) return {"error": f"Suchfehler: {str(e)}"} # API: Batch-Analyse (#44) — enqueued ungeprüfte Drucksachen eines BL @app.post("/api/batch-analyze") @limiter.limit("3/minute") async def batch_analyze( request: Request, bundesland: str = Form(...), limit: int = Form(10), user: dict = Depends(require_admin), ): """Sucht die neuesten Drucksachen im Landtag-Portal und enqueued alle, die noch nicht in der DB bewertet sind. Returns: Liste der enqueued Drucksachen + Queue-Position. """ from .queue import enqueue, QueueFullError if limit < 1 or limit > 100: raise HTTPException(status_code=400, detail="limit muss 1-100 sein") adapter = get_adapter(bundesland) if not adapter: raise HTTPException(status_code=400, detail=f"Bundesland {bundesland} nicht unterstützt") # Neueste Drucksachen vom Landtag holen. Multiplier 10× weil die # meisten Adapter Anfragen+Gesetzentwürfe+Anträge gemischt liefern # und der Antrag-Anteil nur ~10-30% ist. drucksachen = await adapter.search("", limit=limit * 10) enqueued = [] skipped = 0 for doc in drucksachen: if len(enqueued) >= limit: break # Schon bewertet? existing = await get_assessment(doc.drucksache) if existing: skipped += 1 continue # Text herunterladen text = await adapter.download_text(doc.drucksache) if not text: continue # Enqueue job_id = str(uuid.uuid4()) await create_job(job_id, text[:500], bundesland, "qwen-plus", drucksache=doc.drucksache) try: position = await enqueue( job_id, run_drucksache_analysis, job_id, doc.drucksache, text, bundesland, "qwen-plus", doc, drucksache=doc.drucksache, ) enqueued.append({ "drucksache": doc.drucksache, "title": doc.title, "job_id": job_id, "queue_position": position, }) except QueueFullError: break return { "status": "batch_enqueued", "bundesland": bundesland, "enqueued": len(enqueued), "skipped_existing": skipped, "jobs": enqueued, } # API: Analyze a document from parliament portal @app.post("/api/analyze-drucksache") @limiter.limit("10/minute") async def analyze_drucksache( request: Request, background_tasks: BackgroundTasks, drucksache: str = Form(...), bundesland: str = Form("NRW"), model: str = Form("qwen-plus"), user: dict = Depends(require_auth), ): """ Download a document from parliament portal and analyze it. """ drucksache = validate_drucksache(drucksache) # Check if already analyzed existing = await get_assessment(drucksache) if existing: return {"status": "already_checked", "drucksache": drucksache} # Get adapter and download adapter = get_adapter(bundesland) if not adapter: raise HTTPException(status_code=400, detail=f"Bundesland {bundesland} nicht unterstützt") # Download text text = await adapter.download_text(drucksache) if not text: raise HTTPException(status_code=404, detail=f"Dokument {drucksache} nicht gefunden") # Get document metadata doc = await adapter.get_document(drucksache) # Create job and enqueue (#95) from .queue import enqueue, QueueFullError job_id = str(uuid.uuid4()) await create_job(job_id, text[:500], bundesland, model, drucksache=drucksache) try: position = await enqueue( job_id, run_drucksache_analysis, job_id, drucksache, text, bundesland, model, doc, drucksache=drucksache, ) except QueueFullError: await update_job(job_id, status="rejected", error="Queue voll") raise HTTPException( status_code=429, detail="Analyse-Queue ist voll. Bitte später erneut versuchen.", headers={"Retry-After": "60"}, ) return { "status": "queued", "job_id": job_id, "drucksache": drucksache, "queue_position": position, } async def run_drucksache_analysis( job_id: str, drucksache: str, text: str, bundesland: str, model: str, doc ): """Background task for drucksache analysis.""" try: await update_job(job_id, status="processing") # Run LLM analysis assessment = await analyze_antrag(text, bundesland, model) # Prepare data for DB assessment_data = { "drucksache": drucksache, # Titel-Priorität: LLM-generierter Titel > doc.title, # ABER nur wenn doc.title ein echter Titel ist (nicht "Drucksache XX", # wie NRW's get_document zurückgibt). Sonst überschreibt der # generische doc.title den echten LLM-Titel. "title": _pick_best_title(assessment.title, doc.title if doc else None, drucksache), "fraktionen": assessment.fraktionen, "datum": assessment.datum or (doc.datum if doc else ""), "link": doc.link if doc else "", "bundesland": bundesland, "gwoeScore": assessment.gwoe_score, "gwoeBegründung": assessment.gwoe_begruendung, "gwoeMatrix": [m.model_dump() for m in assessment.gwoe_matrix], "gwoeSchwerpunkt": assessment.gwoe_schwerpunkt, "wahlprogrammScores": [w.model_dump() for w in assessment.wahlprogramm_scores], "verbesserungen": [v.model_dump() for v in assessment.verbesserungen], "stärken": assessment.staerken, "schwächen": assessment.schwaechen, "empfehlung": assessment.empfehlung, "empfehlungSymbol": assessment.empfehlung_symbol, "verbesserungspotenzial": assessment.verbesserungspotenzial, "themen": assessment.themen, "antragZusammenfassung": assessment.antrag_zusammenfassung, "antragKernpunkte": assessment.antrag_kernpunkte, "source": "webapp", "model": model, } # Save to DB await upsert_assessment(assessment_data) # Generate reports html_path = settings.reports_dir / f"{job_id}.html" pdf_path = settings.reports_dir / f"{job_id}.pdf" await generate_html_report(assessment, html_path, bundesland=bundesland) await generate_pdf_report(assessment, pdf_path, bundesland=bundesland) await update_job( job_id, status="completed", result=assessment.model_dump_json(), html_path=str(html_path), pdf_path=str(pdf_path), ) except Exception as e: # Volltext-Stack via logger.exception, NICHT via print — landet so im # strukturierten Container-Log und wird vom logging-Framework formatiert logger.exception("run_drucksache_analysis failed for drucksache=%s", drucksache) await update_job(job_id, status="failed", error=str(e)) # API: List available Bundesländer @app.get("/api/bundeslaender") async def list_bundeslaender(): """List available bundesländer with their status. Includes the synthetic "ALL" / Bundesweit entry as the first item so that the frontend can render it directly. ``parlament_name`` is added so the detail view can show the source parliament without an extra backend round-trip. """ out = [{ "code": "ALL", "name": "🌍 Bundesweit", "parlament_name": None, "active": True, }] out.extend({ "code": bl.code, "name": bl.name, "parlament_name": bl.parlament_name, "active": bl.aktiv, } for bl in alle_bundeslaender()) return out # === Quellen / Programme === @app.get("/methodik", response_class=HTMLResponse) async def methodik_page(request: Request): """Transparenz-/Methodik-Seite (#96).""" from .bundeslaender import aktive_bundeslaender, BUNDESLAENDER from .embeddings import get_indexing_status bl_list = [] for bl in aktive_bundeslaender(): bl_list.append({ "code": bl.code, "name": bl.name, "doku_system": bl.doku_system, }) status = get_indexing_status() return templates.TemplateResponse("methodik.html", { "request": request, "app_name": settings.app_name, "adapter_count": len(ADAPTERS), "model_name": settings.llm_model_default, "programme_count": status.get("total", 0), "chunk_count": sum(p.get("chunks", 0) for p in status.get("programmes", [])), "bundeslaender": sorted(bl_list, key=lambda x: x["name"]), }) @app.get("/quellen", response_class=HTMLResponse) async def quellen_page(request: Request): """Quellen-Seite mit allen Wahl- und Parteiprogrammen, nach BL gruppiert.""" from .bundeslaender import BUNDESLAENDER programmes = get_programme_info() status = get_indexing_status() # Wahlprogramme nach Bundesland gruppieren by_bl: dict[str, list] = {} grundsatz = [] for prog in programmes: if prog["typ"] == "parteiprogramm": grundsatz.append(prog) else: bl = prog.get("bundesland") or "Sonstige" bl_name = BUNDESLAENDER[bl].name if bl in BUNDESLAENDER else bl by_bl.setdefault(bl_name, []).append(prog) # Sortieren: alphabetisch nach BL-Name wahlprogramme_grouped = sorted(by_bl.items()) return templates.TemplateResponse("quellen.html", { "request": request, "app_name": settings.app_name, "programmes": programmes, "wahlprogramme_grouped": wahlprogramme_grouped, "grundsatzprogramme": grundsatz, "status": status, }) @app.get("/api/wahlprogramm-cite") async def wahlprogramm_cite( request: Request, background_tasks: BackgroundTasks, pid: str = "", pdf: str = "", seite: int = 1, q: str = "", ds: str = "", bl: str = "", ): """Render eine Wahlprogramm-Seite mit gelb hervorgehobener Zitat-Stelle. Issue #47: Klick auf eine Zitat-Quelle im Report soll direkt zur Stelle im Wahlprogramm-PDF springen, mit dem zitierten Snippet visuell markiert. Statt das ganze PDF auszuliefern (Browser scrollt auf #page=N und Leser muss von Hand suchen), liefern wir hier ein 1-Seiten-PDF mit ``add_highlight_annot``-Annotation auf den per ``page.search_for`` gefundenen Bounding-Boxes. Akzeptiert ``pid`` (PROGRAMME-Key) ODER ``pdf`` (Dateiname wie ``spd-grundsatzprogramm.pdf``). Letzterer ermöglicht die retroaktive Nutzung von Pre-#47-URLs im Frontend, wo nur der statische Pfad ``/static/referenzen/#page=`` gespeichert ist. Security: ``pid`` muss ein registrierter PROGRAMME-Key sein — verhindert Path-Traversal und arbiträren File-Read aus dem referenzen-Verzeichnis. ``seite`` wird per Pydantic-Coercion auf int gezwungen. ``q`` ist auf 200 Zeichen begrenzt im Renderer. """ # Reverse-Lookup: pdf-Filename → programm_id, falls nur pdf angegeben. # Zwei Stufen: exakter Match, dann fuzzy (Year-Suffix-Stripping), weil # Pre-#47 Assessments halluzinierte Dateinamen haben können, z.B. # "gruene-grundsatzprogramm-2020.pdf" statt "gruene-grundsatzprogramm.pdf". if not pid and pdf: # Stage 1: exakt for p, info in PROGRAMME.items(): if info.get("pdf") == pdf: pid = p break # Stage 2: Year-Suffix stripping (z.B. "X-2020.pdf" → "X.pdf") if not pid: import re stripped = re.sub(r"-\d{4}\.pdf$", ".pdf", pdf) if stripped != pdf: for p, info in PROGRAMME.items(): if info.get("pdf") == stripped: pid = p break if pid not in PROGRAMME: raise HTTPException(status_code=404, detail="Unbekanntes Wahlprogramm") if seite < 1 or seite > 2000: raise HTTPException(status_code=400, detail="Ungültige Seitennummer") pdf_bytes, found_page, highlighted = render_highlighted_page(pid, seite, q) if pdf_bytes is None: raise HTTPException( status_code=404, detail="Wahlprogramm-PDF oder Seite nicht verfügbar", ) # Issue #47: Wenn das Zitat nicht im PDF auffindbar ist UND wir die # Drucksache kennen, ist das Assessment wahrscheinlich ein Pre-#60- # Halluzinations-Opfer. Automatische Re-Analyse triggern und dem # User eine Warte-Seite zeigen statt ein PDF ohne Highlights. if not highlighted and q and ds and bl: existing = await get_assessment(ds) if existing: adapter = get_adapter(bl) if adapter: # Altes Assessment löschen und neu analysieren await delete_assessment(ds) job_id = str(uuid.uuid4()) await create_job(job_id, f"Re-Analyse {ds} (Zitat nicht verifizierbar)", bl, "qwen-plus") text = await adapter.download_text(ds) if text: doc = await adapter.get_document(ds) background_tasks.add_task( run_drucksache_analysis, job_id, ds, text, bl, "qwen-plus", doc, ) # HTML-Warte-Seite mit Auto-Redirect zurück zum Assessment return HTMLResponse(f""" Wird neu analysiert…

Zitat nicht verifizierbar

Der Antrag {ds} wird mit der aktuellen Pipeline
neu analysiert, um verifizierte Zitate zu erzeugen.

Automatische Weiterleitung in 15 Sekunden…

""") info = PROGRAMME[pid] safe_name = info.get("pdf", f"{pid}.pdf") return Response( content=pdf_bytes, media_type="application/pdf", headers={ "Content-Disposition": f'inline; filename="{safe_name}"', "Cache-Control": "public, max-age=3600", "X-Found-Page": str(found_page), }, ) @app.get("/api/programme/thumbnail/{programm_id}") async def programme_thumbnail(programm_id: str): """Thumbnail der ersten Seite eines Wahlprogramm-PDFs (PNG, 200px breit). Wird auf der Quellen-Seite als Vorschau angezeigt. Cached 24h. """ import fitz if programm_id not in PROGRAMME: raise HTTPException(status_code=404) info = PROGRAMME[programm_id] pdf_path = static_dir / "referenzen" / info["pdf"] if not pdf_path.exists(): raise HTTPException(status_code=404) try: doc = fitz.open(str(pdf_path)) page = doc[0] # 200px Breite, proportional skaliert zoom = 200 / page.rect.width mat = fitz.Matrix(zoom, zoom) pix = page.get_pixmap(matrix=mat) png_bytes = pix.tobytes("png") doc.close() return Response( content=png_bytes, media_type="image/png", headers={"Cache-Control": "public, max-age=86400"}, ) except Exception: raise HTTPException(status_code=500) @app.get("/api/programme") async def list_programme(): """List all available programmes.""" return get_programme_info() @app.get("/api/programme/status") async def programme_status(): """Get indexing status of all programmes.""" return get_indexing_status() @app.post("/api/programme/index") @limiter.limit("3/minute") async def index_programme( request: Request, background_tasks: BackgroundTasks, programm_id: str = Form(None), all_programmes: bool = Form(False), user: dict = Depends(require_admin), ): """Index programme(s) for semantic search.""" pdf_dir = static_dir / "referenzen" if all_programmes: # Index sequentially to avoid DB locks async def index_all_sequential(): for prog_id in PROGRAMME.keys(): try: index_programm(prog_id, pdf_dir) except Exception: logger.exception("Error indexing programme %s", prog_id) background_tasks.add_task(index_all_sequential) return {"status": "indexing", "programmes": list(PROGRAMME.keys())} if programm_id and programm_id in PROGRAMME: background_tasks.add_task(index_programm, programm_id, pdf_dir) return {"status": "indexing", "programm_id": programm_id} raise HTTPException(status_code=400, detail="Ungültiges Programm") # ───────────────────────────────────────────────────────────────────────────── # Auswertungen #58 — Bundesland × Partei × Wahlperiode Aggregations-Sicht # ───────────────────────────────────────────────────────────────────────────── @app.get("/auswertungen", response_class=HTMLResponse) async def auswertungen_page(request: Request): """Statische Seite, die die Matrix-Endpoints per fetch() lädt.""" from .wahlperioden import all_wahlperioden return templates.TemplateResponse("auswertungen.html", { "request": request, "app_name": settings.app_name, "wahlperioden": sorted(all_wahlperioden()), }) @app.get("/api/auswertungen/matrix") async def auswertungen_matrix(wahlperiode: Optional[str] = None): """2D-Matrix Bundesland × Partei mit Anzahl + Ø-GWÖ-Score.""" from .auswertungen import aggregate_matrix return aggregate_matrix(filter_wp=wahlperiode) @app.get("/api/auswertungen/zeitreihe") async def auswertungen_zeitreihe(bundesland: str, partei: str): """Score-Verlauf einer (BL, Partei)-Kombination über alle WPs.""" from .auswertungen import aggregate_zeitreihe return aggregate_zeitreihe(bundesland, partei) @app.get("/api/auswertungen/export.csv") async def auswertungen_export_csv(): """Long-Format-CSV-Export aller Assessments. Deckt #45 mit ab.""" from .auswertungen import export_long_format csv_text = export_long_format() return Response( content=csv_text, media_type="text/csv", headers={"Content-Disposition": 'attachment; filename="gwoe-assessments.csv"'}, ) # Health check @app.get("/health") async def health(): return {"status": "ok", "version": settings.app_version}