gwoe-antragspruefer/app/main.py
Dotty Dotter a8d7b72702 feat(v2): Feedback-Widget mit Audit-Trail + Screenshot + direkter Gitea-Anbindung
- Component v2/components/feedback_widget.html: Button unten links oberhalb der
  Queue, Klick oeffnet Modal mit vorausgefuellten Kontext-Feldern (URL,
  Drucksache, Viewport, User-Agent, letzte 15 Klicks, letzte 10 Console-Errors,
  letzte 5 Page-Loads). Eingaben: Titel, Beschreibung, optional Screenshot
- Audit-Trail-Sammler in localStorage (Ringbuffer 30 Klicks, 10 Errors)
- Screenshot via self-hosted html2canvas 1.4.1 (194 KB unter app/static/v2/lib/)
- Backend POST /api/feedback (rate-limit 5/h):
  - validiert + html-strippt Inputs
  - erstellt Gitea-Issue per API mit Label 'feedback' (Label wird idempotent angelegt)
  - laedt Screenshot als Issue-Asset hoch (Gitea Issue-Attachment-API)
- 4 neue Settings: gitea_token, gitea_api_url, gitea_repo_owner, gitea_repo_name
- Server .env um GITEA_TOKEN ergaenzt
- 10 neue Unit-Tests (mit gemocktem httpx)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 01:00:44 +02:00

2788 lines
114 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""GWÖ-Antragsprüfer — FastAPI Webapp."""
import logging
import uuid
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, File, Form, UploadFile, Request, BackgroundTasks, HTTPException, Depends
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, Response
from pydantic import BaseModel
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from .validators import (
MAX_SEARCH_QUERY_LEN,
validate_drucksache,
validate_search_query,
)
# Strukturiertes Logging für die ganze App. uvicorn registriert seinen
# eigenen Root-Handler erst beim Start; wir setzen ein neutrales Format
# für unsere Module früh, damit logger.exception() auch beim Boot greift.
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
from .config import settings
from .database import (
init_db, get_job, create_job, update_job,
get_all_assessments, get_assessment, delete_assessment,
upsert_assessment, import_json_assessments,
search_assessments,
toggle_bookmark, get_bookmarks, add_comment, get_comments, delete_comment,
create_subscription, list_subscriptions, list_all_subscriptions, delete_subscription,
delete_subscription_by_id,
upsert_vote, get_votes,
get_assessment_history,
get_abstimmungsverhalten,
merkliste_add, merkliste_remove, merkliste_list, merkliste_bulk_add,
)
from .parlamente import get_adapter, ADAPTERS
from .bundeslaender import alle_bundeslaender
from .analyzer import analyze_antrag
from .auth import get_current_user, require_auth, require_admin, keycloak_login_url, keycloak_admin_token, _is_auth_enabled
def _pick_best_title(llm_title: str, doc_title: Optional[str], drucksache: str) -> str:
"""Wähle den besten Titel aus LLM-Output und Adapter-Metadata.
Priorität:
1. doc_title, wenn ein echter Titel (nicht "Drucksache XX")
2. llm_title, wenn nicht leer und nicht generisch
3. Generischer Fallback "Drucksache XX"
"""
generic_prefix = f"Drucksache {drucksache.split('/')[0]}"
# doc_title gut? (nicht generisch, nicht leer)
if doc_title and not doc_title.startswith("Drucksache ") and len(doc_title) > 5:
return doc_title
# LLM-Titel gut? (nicht generisch)
if llm_title and not llm_title.startswith("Drucksache ") and len(llm_title) > 5:
return llm_title
# doc_title als Fallback (auch wenn generisch)
return doc_title or llm_title or f"Drucksache {drucksache}"
from .report import generate_html_report, generate_pdf_report
from .embeddings import (
init_embeddings_db, get_programme_info, get_indexing_status,
index_programm, render_highlighted_page, PROGRAMME,
)
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
docs_url=None, # Disable /docs in production
redoc_url=None, # Disable /redoc in production
openapi_url=None, # Disable /openapi.json in production
)
# Rate-Limiter — fängt Resource-Exhaustion auf den teuren POST-Endpoints
# (LLM-Calls + Indexing). Issue #57 Befund #1 (HIGH). Default in-memory
# Storage; für mehrere Worker müsste man auf Redis umstellen, solange wir
# auf einem Container laufen reicht das Default-Storage.
limiter = Limiter(key_func=get_remote_address, default_limits=[])
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Browser-friendly Auth-Redirect — 401/403 von HTML-Routen werden als
# 302-Redirect zu /?login=1 ausgeliefert (Login-Modal öffnet sich automatisch).
# API-Calls (Accept: application/json) bleiben bei 401/403-JSON.
@app.exception_handler(HTTPException)
async def _auth_redirect_handler(request: Request, exc: HTTPException):
if exc.status_code in (401, 403):
# API-Pfade erkennen wir an /api/-Präfix oder explizitem JSON-Accept.
accept = request.headers.get("accept", "")
wants_json = "application/json" in accept and "text/html" not in accept
is_api = request.url.path.startswith("/api/")
is_browser = not is_api and not wants_json
if is_browser:
from fastapi.responses import RedirectResponse
target = f"/?login=1&next={request.url.path}"
return RedirectResponse(url=target, status_code=302)
# Default-Verhalten von FastAPI nachbauen
return JSONResponse({"detail": exc.detail}, status_code=exc.status_code, headers=exc.headers or None)
# Security Headers Middleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
# CSP: Allow self, inline styles (for templates), and PDF viewer
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"script-src 'self' 'unsafe-inline'; "
"img-src 'self' data:; "
"frame-ancestors 'none';"
)
return response
app.add_middleware(SecurityHeadersMiddleware)
# Setup directories
settings.data_dir.mkdir(exist_ok=True)
settings.reports_dir.mkdir(exist_ok=True)
# Static files and templates
static_dir = Path(__file__).parent / "static"
templates_dir = Path(__file__).parent / "templates"
static_dir.mkdir(exist_ok=True)
templates_dir.mkdir(exist_ok=True)
app.mount("/static", StaticFiles(directory=static_dir), name="static")
templates = Jinja2Templates(directory=str(templates_dir))
# ─── Auth-Fehler bei HTML-Seiten: Redirect statt JSON-401/403 ─────────────────
@app.exception_handler(401)
async def auth_required_redirect(request: Request, exc: HTTPException):
"""Bei 401 auf HTML-Routes: Redirect zu /?login=1 (öffnet Auth-Modal)."""
accept = request.headers.get("accept", "")
if "text/html" in accept:
from fastapi.responses import RedirectResponse
return RedirectResponse("/?login=1", status_code=302)
return JSONResponse({"detail": exc.detail}, status_code=401)
@app.exception_handler(403)
async def admin_required_redirect(request: Request, exc: HTTPException):
"""Bei 403 auf HTML-Routes: Redirect zu /?login=1 (öffnet Auth-Modal)."""
accept = request.headers.get("accept", "")
if "text/html" in accept:
from fastapi.responses import RedirectResponse
return RedirectResponse("/?login=1", status_code=302)
return JSONResponse({"detail": exc.detail}, status_code=403)
@app.on_event("startup")
async def startup():
import asyncio
await init_db()
init_embeddings_db()
# Job-Queue Worker starten (#95) — Worker ZUERST, dann Re-Enqueue
# im Hintergrund (damit die Webapp sofort erreichbar ist)
from .queue import start_worker, re_enqueue_pending
start_worker()
asyncio.create_task(re_enqueue_pending(analysis_callback=run_drucksache_analysis))
@app.on_event("shutdown")
async def shutdown():
"""Graceful Shutdown: warte auf laufende Queue-Jobs bevor der Container stirbt."""
from .queue import graceful_shutdown
await graceful_shutdown(timeout=900) # 15 min — passend zu stop_grace_period
# JSON import disabled - all assessments now live in SQLite DB only
# Legacy import would overwrite new v5 assessments with old format
# count = await import_json_assessments(settings.data_dir / "assessments")
# if count > 0:
# print(f"Imported {count} assessments from JSON files")
@app.get("/classic", response_class=HTMLResponse)
async def classic_index(request: Request):
"""Klassische Ansicht (v1) — jetzt unter /classic erreichbar."""
# Frontend-Liste: synthetischer "ALL"-Eintrag (Bundesweit) zuerst, dann
# die echten Bundesländer aus der Konfig. Der "ALL"-Code ist eine reine
# Frontend/API-Konvention, kein Eintrag in bundeslaender.py.
bl_list = [{"code": "ALL", "name": "🌍 Bundesweit", "active": True}]
bl_list.extend(
{"code": bl.code, "name": bl.name, "active": bl.aktiv}
for bl in alle_bundeslaender()
)
# Map code → parlament_name, damit das Frontend ohne extra Backend-Call
# für jeden Antrag den Parlamentsnamen anzeigen kann.
parlament_names = {
bl.code: bl.parlament_name for bl in alle_bundeslaender()
}
from .models import MATRIX_LABELS
return templates.TemplateResponse("index.html", {
"request": request,
"app_name": settings.app_name,
"bundeslaender": bl_list,
"parlament_names": parlament_names,
"matrix_labels": MATRIX_LABELS,
"matrix_explanations": {
"A1": "Wenn Ihre Stadt Büromöbel kauft: Wurden die unter menschenwürdigen Bedingungen hergestellt? Oder in einer Fabrik, in der Arbeiter:innen ausgebeutet werden? Hier geht es darum, ob die öffentliche Hand beim Einkauf auf Menschenrechte achtet.",
"A2": "Beauftragt die Stadt den Handwerksbetrieb aus dem Ort — oder den billigsten Konzern aus dem Ausland? Bleibt das Geld in der Region und schafft Arbeitsplätze vor Ort?",
"A3": "Werden bei öffentlichen Aufträgen Klimastandards verlangt? Kommt das Schulessen von regionalen Bauernhöfen oder wird es quer durch Europa gekarrt?",
"A4": "Verdienen die Reinigungskräfte im Rathaus einen fairen Lohn? Haben Subunternehmer die gleichen Arbeitsbedingungen wie Festangestellte?",
"A5": "Können Sie als Bürger:in nachschauen, welche Firma den Auftrag für den Straßenbau bekommen hat — und warum? Oder passiert das alles hinter verschlossenen Türen?",
"B1": "Liegt das Geld Ihrer Stadt bei einer Bank, die auch Waffengeschäfte finanziert? Oder bei einer ethischen Bank, die in soziale Projekte investiert?",
"B2": "Fließen Ihre Steuergelder in einen neuen Radweg für alle — oder in eine Umgehungsstraße, die nur dem Gewerbegebiet nützt?",
"B3": "Investiert Ihre Kommune in Solaranlagen auf Schuldächern? Oder wird das Geld in klimaschädliche Projekte gesteckt?",
"B4": "Bekommen ärmere Stadtteile genauso viel Geld für Spielplätze und Schulen wie reiche? Oder konzentrieren sich die Investitionen dort, wo die Grundstückspreise schon hoch sind?",
"B5": "Gibt es einen Bürgerhaushalt, bei dem Sie mitbestimmen können, ob das Geld in die Bibliothek oder den Sportplatz fließt? Oder entscheidet das der Stadtrat allein?",
"C1": "Werden in Ihrer Stadtverwaltung Frauen gleich bezahlt? Haben Menschen mit Behinderung faire Chancen auf eine Stelle? Gibt es Schutz vor Mobbing?",
"C2": "Hat Ihre Stadt ein Klimaschutzkonzept, das alle Ämter gemeinsam umsetzen? Oder kocht jedes Amt sein eigenes Süppchen?",
"C3": "Fahren die Mitarbeiter:innen des Rathauses mit dem Dienstrad oder dem SUV? Gibt es vegetarisches Essen in der Kantine?",
"C4": "Können Eltern in der Verwaltung Teilzeit arbeiten, ohne Karrierenachteile? Gibt es flexible Arbeitszeiten für pflegende Angehörige?",
"C5": "Können Sie die Sitzungsprotokolle des Stadtrats online lesen? Verstehen Sie, warum Entscheidungen so und nicht anders gefallen sind?",
"D1": "Werden Sie auf dem Amt fair behandelt — egal ob Sie einen deutschen oder ausländischen Namen haben? Schützt die Polizei alle gleich?",
"D2": "Profitiert die ganze Stadt von dem Antrag — oder nur ein Stadtteil, eine Altersgruppe, eine Einkommensschicht?",
"D3": "Kommt der Strom für die Straßenbeleuchtung aus Erneuerbaren? Wird das Regenwasser im Park versickert statt in die Kanalisation geleitet?",
"D4": "Kann sich die alleinerziehende Mutter den Kitaplatz leisten? Bekommt der Rentner noch einen Arzttermin? Findet die Familie mit drei Kindern eine bezahlbare Wohnung?",
"D5": "Werden Sie gefragt, bevor die Straße vor Ihrem Haus umgebaut wird? Gibt es Bürgerversammlungen, Online-Beteiligung, Jugendparlamente?",
"E1": "Hinterlassen wir unseren Enkeln einen Schuldenberg und versiegelte Flächen? Oder investieren wir heute so, dass auch 2050 noch gute Lebensbedingungen herrschen?",
"E2": "Hilft der Antrag nur Ihrer Stadt — oder auch den Nachbargemeinden? Gibt es regionale Kooperationen, von denen alle profitieren?",
"E3": "Denkt Ihre Kommune beim Einkauf auch an den CO₂-Fußabdruck? An die Abholzung von Regenwäldern für billiges Papier? An den Wasserverbrauch in Dürregebieten?",
"E4": "Unterstützt Ihre Stadt strukturschwache Regionen? Gibt es Partnerschaften mit Kommunen im globalen Süden?",
"E5": "Setzt sich Ihre Kommune für mehr Demokratie ein — auch auf Landes- und Bundesebene? Werden internationale Abkommen unterstützt?",
},
})
# ─── Default: / → v2 (Default-Flip #139 Phase 2) ────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def index(request: Request, current_user: Optional[dict] = Depends(get_current_user)):
"""Startseite — rendert v2-Listenansicht (Default-Flip Phase 2).
Alte /?drucksache=XX-YYYY Deep-Links werden per 301-Redirect auf
/antrag/XX-YYYY weitergeleitet, damit Bookmarks weiter funktionieren.
"""
from fastapi.responses import RedirectResponse
# Deep-Link-Kompatibilität: /?drucksache=18/12345 → /antrag/18/12345
drucksache_param = request.query_params.get("drucksache")
if drucksache_param:
return RedirectResponse(f"/antrag/{drucksache_param}", status_code=301)
rows = await get_all_assessments(None)
assessments = _rows_to_list(rows)
bl_codes = sorted({a["bundesland"] for a in assessments if a.get("bundesland")})
return templates.TemplateResponse("v2/screens/durchsuchen.html", {
"request": request,
"v2_active_nav": "durchsuchen",
"assessments": assessments,
"bl_codes": bl_codes,
"assessment_count": len(assessments),
**_v2_template_context(current_user),
})
@app.get("/antrag/{drucksache:path}", response_class=HTMLResponse)
async def antrag_detail(request: Request, drucksache: str, current_user: Optional[dict] = Depends(get_current_user)):
"""v2-Antragsdetail-Route — Server-Side-Rendering mit echten DB-Daten."""
try:
drucksache = validate_drucksache(drucksache)
except Exception:
return templates.TemplateResponse("v2/screens/antrag_detail.html", {
"request": request,
"v2_active_nav": "durchsuchen",
"error": f"Ungültige Drucksachen-ID: {drucksache}",
**_v2_template_context(current_user),
}, status_code=400)
row = await get_assessment(drucksache)
if not row:
return templates.TemplateResponse("v2/screens/antrag_detail.html", {
"request": request,
"v2_active_nav": "durchsuchen",
"error": f"Antrag {drucksache} wurde nicht gefunden.",
**_v2_template_context(current_user),
}, status_code=404)
antrag = _row_to_detail(row)
# #106 Phase 1: namentliche Abstimmungsdaten ergänzen (optional, kann None sein)
try:
antrag["abstimmungsverhalten"] = await get_abstimmungsverhalten(drucksache)
except Exception:
logger.exception("Fehler beim Laden von Abstimmungsverhalten für %s", drucksache)
antrag["abstimmungsverhalten"] = None
from .models import MATRIX_LABELS
return templates.TemplateResponse("v2/screens/antrag_detail.html", {
"request": request,
"v2_active_nav": "durchsuchen",
"antrag": antrag,
"assessment_count": None,
"matrix_explanations": {
"A1": "Wenn Ihre Stadt Büromöbel kauft: Wurden die unter menschenwürdigen Bedingungen hergestellt? Oder in einer Fabrik, in der Arbeiter:innen ausgebeutet werden? Hier geht es darum, ob die öffentliche Hand beim Einkauf auf Menschenrechte achtet.",
"A2": "Beauftragt die Stadt den Handwerksbetrieb aus dem Ort — oder den billigsten Konzern aus dem Ausland? Bleibt das Geld in der Region und schafft Arbeitsplätze vor Ort?",
"A3": "Werden bei öffentlichen Aufträgen Klimastandards verlangt? Kommt das Schulessen von regionalen Bauernhöfen oder wird es quer durch Europa gekarrt?",
"A4": "Verdienen die Reinigungskräfte im Rathaus einen fairen Lohn? Haben Subunternehmer die gleichen Arbeitsbedingungen wie Festangestellte?",
"A5": "Können Sie als Bürger:in nachschauen, welche Firma den Auftrag für den Straßenbau bekommen hat — und warum? Oder passiert das alles hinter verschlossenen Türen?",
"B1": "Liegt das Geld Ihrer Stadt bei einer Bank, die auch Waffengeschäfte finanziert? Oder bei einer ethischen Bank, die in soziale Projekte investiert?",
"B2": "Fließen Ihre Steuergelder in einen neuen Radweg für alle — oder in eine Umgehungsstraße, die nur dem Gewerbegebiet nützt?",
"B3": "Investiert Ihre Kommune in Solaranlagen auf Schuldächern? Oder wird das Geld in klimaschädliche Projekte gesteckt?",
"B4": "Bekommen ärmere Stadtteile genauso viel Geld für Spielplätze und Schulen wie reiche? Oder konzentrieren sich die Investitionen dort, wo die Grundstückspreise schon hoch sind?",
"B5": "Gibt es einen Bürgerhaushalt, bei dem Sie mitbestimmen können, ob das Geld in die Bibliothek oder den Sportplatz fließt? Oder entscheidet das der Stadtrat allein?",
"C1": "Werden in Ihrer Stadtverwaltung Frauen gleich bezahlt? Haben Menschen mit Behinderung faire Chancen auf eine Stelle? Gibt es Schutz vor Mobbing?",
"C2": "Hat Ihre Stadt ein Klimaschutzkonzept, das alle Ämter gemeinsam umsetzen? Oder kocht jedes Amt sein eigenes Süppchen?",
"C3": "Fahren die Mitarbeiter:innen des Rathauses mit dem Dienstrad oder dem SUV? Gibt es vegetarisches Essen in der Kantine?",
"C4": "Können Eltern in der Verwaltung Teilzeit arbeiten, ohne Karrierenachteile? Gibt es flexible Arbeitszeiten für pflegende Angehörige?",
"C5": "Können Sie die Sitzungsprotokolle des Stadtrats online lesen? Verstehen Sie, warum Entscheidungen so und nicht anders gefallen sind?",
"D1": "Werden Sie auf dem Amt fair behandelt — egal ob Sie einen deutschen oder ausländischen Namen haben? Schützt die Polizei alle gleich?",
"D2": "Profitiert die ganze Stadt von dem Antrag — oder nur ein Stadtteil, eine Altersgruppe, eine Einkommensschicht?",
"D3": "Kommt der Strom für die Straßenbeleuchtung aus Erneuerbaren? Wird das Regenwasser im Park versickert statt in die Kanalisation geleitet?",
"D4": "Kann sich die alleinerziehende Mutter den Kitaplatz leisten? Bekommt der Rentner noch einen Arzttermin? Findet die Familie mit drei Kindern eine bezahlbare Wohnung?",
"D5": "Werden Sie gefragt, bevor die Straße vor Ihrem Haus umgebaut wird? Gibt es Bürgerversammlungen, Online-Beteiligung, Jugendparlamente?",
"E1": "Hinterlassen wir unseren Enkeln einen Schuldenberg und versiegelte Flächen? Oder investieren wir heute so, dass auch 2050 noch gute Lebensbedingungen herrschen?",
"E2": "Hilft der Antrag nur Ihrer Stadt — oder auch den Nachbargemeinden? Gibt es regionale Kooperationen, von denen alle profitieren?",
"E3": "Denkt Ihre Kommune beim Einkauf auch an den CO₂-Fußabdruck? An die Abholzung von Regenwäldern für billiges Papier? An den Wasserverbrauch in Dürregebieten?",
"E4": "Unterstützt Ihre Stadt strukturschwache Regionen? Gibt es Partnerschaften mit Kommunen im globalen Süden?",
"E5": "Setzt sich Ihre Kommune für mehr Demokratie ein — auch auf Landes- und Bundesebene? Werden internationale Abkommen unterstützt?",
},
"matrix_labels": MATRIX_LABELS,
**_v2_template_context(current_user),
})
def _v2_template_context(current_user=None) -> dict:
"""Gemeinsame v2-Template-Variablen: is_admin, is_authenticated, v2_bundeslaender.
Wird in jeder v2-Route aufgerufen und per **-Spread in den Template-Context gemischt.
"""
is_authenticated = bool(current_user and current_user.get("authenticated", False))
# require_auth liefert keinen "authenticated"-Key, aber ein sub-Feld — beides prüfen
if current_user and current_user.get("sub"):
is_authenticated = True
roles = (current_user or {}).get("roles", [])
is_admin = "admin" in roles or "gwoe-admin" in roles
v2_bls = [
{"code": bl.code, "name": bl.name}
for bl in alle_bundeslaender()
if bl.aktiv
]
return {
"is_authenticated": is_authenticated,
"is_admin": is_admin,
"v2_bundeslaender": v2_bls,
"app_version": settings.app_version,
}
def _rows_to_list(rows):
"""Konvertiert DB-Rows in das flache Dict-Format für die v2-Listenansicht."""
result = []
for row in rows:
result.append({
"drucksache": row.get("drucksache", ""),
"title": row.get("title", ""),
"score": row.get("gwoe_score") or 0.0,
"parteien": row.get("fraktionen", []),
"bundesland": row.get("bundesland", ""),
"tags": row.get("themen", []),
"datum": row.get("datum", ""),
})
return result
from .redline_utils import parse_redline_segments as _parse_redline_segments
from .redline_utils import build_pdf_href as _build_pdf_href
def _row_to_detail(row):
"""Konvertiert eine DB-Row in das antrag-Dict für den v2-Detailscreen."""
import datetime
fraktionen = row.get("fraktionen", [])
gwoe_matrix_raw = row.get("gwoe_matrix", [])
wahlprogramm_scores = row.get("wahlprogramm_scores", [])
verbesserungen = row.get("verbesserungen", [])
staerken = row.get("staerken", [])
schwaechen = row.get("schwaechen", [])
bundesland = row.get("bundesland", "")
# gwoe_matrix ist ein Array [{field, rating, symbol, …}] → Dict A1→{rating,symbol}
matrix_dict = {}
for cell in (gwoe_matrix_raw or []):
field = cell.get("field", "")
if field:
rating_raw = cell.get("rating", 0)
# DB speichert rating als 1-5 Skala (1=,2=,3=○,4=+,5=++),
# matrix_mini erwartet -2..+2. Mapping: DB-3=0, DB-4=+1, DB-5=+2, DB-2=1, DB-1=2
rating_normalized = int(rating_raw) - 3
symbol = cell.get("symbol", "")
matrix_dict[field] = {"rating": rating_normalized, "symbol": symbol}
# fraktions_scores: numerische Scores + Begründungen + leere Zitat-Listen (Fix 2+3)
fraktions_scores = []
for wp in (wahlprogramm_scores or []):
fraktion = wp.get("fraktion", "")
wp_src = wp.get("wahlprogramm") or {}
pp_src = wp.get("parteiprogramm") or {}
fraktions_scores.append({
"fraktion": fraktion,
"ist_antragsteller": wp.get("istAntragsteller", wp.get("ist_antragsteller")),
"ist_regierung": wp.get("istRegierung", wp.get("ist_regierung")),
"wahlprogramm": {
"score": wp_src.get("score", 0),
"begruendung": wp_src.get("begruendung", wp_src.get("begründung", "")),
"hat_zitate": bool(wp_src.get("zitate")),
},
"parteiprogramm": {
"score": pp_src.get("score", 0),
"begruendung": pp_src.get("begruendung", pp_src.get("begründung", "")),
"hat_zitate": bool(pp_src.get("zitate")),
},
})
# Zitate aus wahlprogramm_scores extrahieren (flache Liste über alle Fraktionen)
# PDF-Href wird bevorzugt aus url-Feld genommen, sonst aus quelle rekonstruiert
zitate = []
for wp in (wahlprogramm_scores or []):
fraktion = wp.get("fraktion", "")
for src_key in ("wahlprogramm", "parteiprogramm"):
src = wp.get(src_key) or {}
for zitat in (src.get("zitate") or []):
zitate.append({
"text": zitat.get("text", ""),
"source": zitat.get("quelle", ""),
"partei": fraktion,
"verified": True,
"contra": False,
"pdf_href": _build_pdf_href(zitat, bundesland),
})
# Stärkster/schwächster Wert aus staerken/schwaechen (erste Einträge)
staerkster = {"titel": "Stärken", "text": "; ".join(staerken[:2]) if staerken else ""}
schwaechster = {"titel": "Schwächen", "text": "; ".join(schwaechen[:2]) if schwaechen else ""}
# Verbesserungen: §INS§/§DEL§- und **/**-/~~Marker parsen (Fix 1)
verbesserungen_parsed = []
for v in (verbesserungen or []):
vorschlag_raw = v.get("vorschlag", "")
verbesserungen_parsed.append({
"original": v.get("original", ""),
"vorschlag": vorschlag_raw,
"begruendung": v.get("begruendung", ""),
"segments": _parse_redline_segments(vorschlag_raw),
})
# Redline-Fallback (wird nur genutzt wenn verbesserungen leer, für rückwärts-Compat)
redline_data = {"segments": []}
if not verbesserungen_parsed and verbesserungen:
v = verbesserungen[0]
redline_data["segments"] = _parse_redline_segments(v.get("vorschlag", ""))
# Datum normalisieren: ISO → lesbar
datum_raw = row.get("datum", "")
datum_display = datum_raw
try:
d = datetime.date.fromisoformat(datum_raw[:10])
datum_display = d.strftime("%d.%m.%Y")
except Exception:
pass
updated_at = row.get("updated_at", "")
analysiert_display = updated_at
try:
d2 = datetime.datetime.fromisoformat(updated_at[:19])
analysiert_display = d2.strftime("%d.%m.%Y")
except Exception:
pass
# gwoe_score als float (DB kann None liefern)
score = float(row.get("gwoe_score") or 0.0)
empfehlung = row.get("empfehlung") or ""
return {
"drucksache": row.get("drucksache", ""),
"bundesland": row.get("bundesland", ""),
"parlament": "Landtag",
"typ": "Antrag",
"datum": datum_display,
"analysiert": analysiert_display,
"modell": row.get("model", ""),
"parteien": fraktionen,
"zitate_count": len(zitate),
"title": row.get("title", ""),
"score": score,
"verdict_title": empfehlung,
"verdict_body": row.get("gwoe_begruendung", "") or "",
"zusammenfassung": row.get("antrag_zusammenfassung", "") or "",
"staerkster_wert": staerkster,
"schwaechster_wert": schwaechster,
"redline": redline_data,
"matrix": matrix_dict,
"zitate": zitate,
"fraktions_scores": fraktions_scores,
"verbesserungen": verbesserungen_parsed,
"staerken": staerken,
"schwaechen": schwaechen,
"share_threads": row.get("share_threads"),
"share_twitter": row.get("share_twitter"),
"share_mastodon": row.get("share_mastodon"),
# Roher ISO-Zeitstempel für OG-Cache-Key (#141)
"updated_at_raw": row.get("updated_at", ""),
}
@app.post("/analyze")
@limiter.limit("10/minute")
async def start_analysis(
request: Request,
background_tasks: BackgroundTasks,
text: Optional[str] = Form(None),
file: Optional[UploadFile] = File(None),
bundesland: str = Form("NRW"),
model: str = Form("qwen-plus"),
user: dict = Depends(require_auth),
):
"""Start analysis job."""
if not text and not file:
raise HTTPException(status_code=400, detail="Entweder Text oder PDF-Datei erforderlich")
# Extract text from PDF if uploaded
if file and file.filename:
import fitz # PyMuPDF
pdf_bytes = await file.read()
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
text = ""
for page in doc:
text += page.get_text()
doc.close()
# Create job
job_id = str(uuid.uuid4())
await create_job(job_id, text[:500], bundesland, model)
# Start background analysis
background_tasks.add_task(run_analysis, job_id, text, bundesland, model)
return JSONResponse({"job_id": job_id, "status": "queued"})
async def run_analysis(job_id: str, text: str, bundesland: str, model: str):
"""Background task for analysis."""
try:
await update_job(job_id, status="processing")
# Run LLM analysis
assessment = await analyze_antrag(text, bundesland, model)
# Generate reports
html_path = settings.reports_dir / f"{job_id}.html"
pdf_path = settings.reports_dir / f"{job_id}.pdf"
await generate_html_report(assessment, html_path, bundesland=bundesland)
await generate_pdf_report(assessment, pdf_path, bundesland=bundesland)
await update_job(
job_id,
status="completed",
result=assessment.model_dump_json(),
html_path=str(html_path),
pdf_path=str(pdf_path),
)
except Exception as e:
await update_job(job_id, status="failed", error=str(e))
@app.get("/status/{job_id}")
async def get_status(job_id: str):
"""Get job status."""
job = await get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job nicht gefunden")
return JSONResponse({
"job_id": job_id,
"status": job["status"],
"created_at": job["created_at"],
})
@app.get("/result/{job_id}", response_class=HTMLResponse)
async def get_result(request: Request, job_id: str):
"""Get analysis result as HTML."""
job = await get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job nicht gefunden")
if job["status"] != "completed":
raise HTTPException(status_code=400, detail=f"Job noch nicht fertig: {job['status']}")
html_path = Path(job["html_path"])
if html_path.exists():
return HTMLResponse(html_path.read_text())
raise HTTPException(status_code=500, detail="Report nicht gefunden")
@app.get("/result/{job_id}/pdf")
async def get_pdf(job_id: str):
"""Download PDF report."""
job = await get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job nicht gefunden")
if job["status"] != "completed":
raise HTTPException(status_code=400, detail=f"Job noch nicht fertig: {job['status']}")
pdf_path = Path(job["pdf_path"])
if pdf_path.exists():
return FileResponse(
pdf_path,
media_type="application/pdf",
filename=f"gwoe-bericht-{job_id[:8]}.pdf"
)
raise HTTPException(status_code=500, detail="PDF nicht gefunden")
# ─── Queue-Status (#95) ─────────────────────────────────────────────────────
@app.get("/api/queue/status")
async def queue_status():
"""Aktueller Queue-Stand: wartende Jobs, geschätzte Wartezeit."""
from .queue import get_queue_status
return get_queue_status()
# ─── Auth-Endpoints (#43) ───────────────────────────────────────────────────
@app.get("/api/auth/me")
async def auth_me(user=Depends(get_current_user)):
"""User-Info oder null wenn nicht eingeloggt.
Das Frontend ruft diesen Endpoint beim Load auf, um zu entscheiden
ob "Bewerten" aktiv oder ausgegraut ist.
"""
if user:
return {"authenticated": True, **user}
return {"authenticated": False}
@app.get("/api/auth/callback")
async def auth_callback(request: Request, code: str = "", state: str = ""):
"""OIDC Authorization Code → Access Token Exchange.
Keycloak redirects hierher nach Login mit ?code=... Parameter.
Wir tauschen den Code gegen ein Access Token und setzen es als Cookie.
"""
if not _is_auth_enabled() or not code:
from fastapi.responses import RedirectResponse
return RedirectResponse("/")
from .auth import _keycloak_issuer
token_url = f"{_keycloak_issuer()}/protocol/openid-connect/token"
# Construct the same redirect_uri used for the auth request
base = str(request.base_url).rstrip("/").replace("http://", "https://")
redirect_uri = f"{base}/api/auth/callback"
import httpx as _httpx
async with _httpx.AsyncClient(timeout=10) as client:
resp = await client.post(token_url, data={
"grant_type": "authorization_code",
"client_id": settings.keycloak_client_id,
"code": code,
"redirect_uri": redirect_uri,
})
if resp.status_code != 200:
logger.error("Token exchange failed: %s %s", resp.status_code, resp.text[:200])
raise HTTPException(status_code=401, detail="Login fehlgeschlagen")
tokens = resp.json()
access_token = tokens.get("access_token", "")
expires_in = tokens.get("expires_in", 3600)
# HTML-Response statt RedirectResponse: setzt Cookie UND redirected.
# RedirectResponse mit Set-Cookie wird von manchen Browsern bei
# 307/302 ignoriert (insb. hinter Reverse-Proxies).
return HTMLResponse(
f"""<!DOCTYPE html><html><head>
<meta http-equiv="refresh" content="0;url=/">
</head><body><p>Anmeldung erfolgreich, Weiterleitung...</p></body></html>""",
headers={
"Set-Cookie": (
f"access_token={access_token}; Path=/; Secure; HttpOnly; "
f"SameSite=Lax; Max-Age={expires_in}"
)
},
)
@app.get("/api/auth/login-url")
async def auth_login_url(request: Request, redirect: str = "/"):
"""Keycloak-Login-URL für den Browser-Redirect."""
if not _is_auth_enabled():
return {"enabled": False, "url": ""}
base = str(request.base_url).rstrip("/").replace("http://", "https://")
url = keycloak_login_url(f"{base}/api/auth/callback")
return {"enabled": True, "url": url}
@app.get("/api/auth/forgot-password")
async def auth_forgot_password(request: Request):
"""Redirect zur Keycloak-Passwort-Reset-Seite (#143-Folge).
Keycloak bietet bei `resetPasswordAllowed=True` eine eigene Reset-Page,
die per Mail einen Link zum Passwort-Setzen schickt. Wir leiten direkt
dahin um statt eine eigene UI zu bauen.
"""
from fastapi.responses import RedirectResponse
base = str(request.base_url).rstrip("/").replace("http://", "https://")
issuer = f"{settings.keycloak_url}/realms/{settings.keycloak_realm}"
target = (
f"{issuer}/login-actions/reset-credentials"
f"?client_id={settings.keycloak_client_id}"
f"&redirect_uri={base}/"
)
return RedirectResponse(url=target, status_code=302)
@app.post("/api/auth/login")
async def auth_direct_login(
username: str = Form(...),
password: str = Form(...),
):
"""Direct Access Grant Login (#129) — kein Redirect zu Keycloak.
Ruft Keycloak per Resource Owner Password Credentials (Direct Access Grant) an.
Setzt access_token als HttpOnly-Cookie und refresh_token als separates rt-Cookie.
KEYCLOAK-VORAUSSETZUNG: Im Client muss "Direct Access Grants" aktiviert sein.
Keycloak Admin → Realm → Clients → gwoe-antragspruefer → Capability config →
"Direct access grants enabled" = ON.
Fehler-Mapping:
- 401 → {"error": "invalid_credentials", "msg": "..."}
- Keycloak nicht erreichbar / sonstiges → {"error": "unknown", "msg": "..."}
"""
from .auth import direct_login, _validate_token
try:
token_data = await direct_login(username, password)
except HTTPException as exc:
if exc.status_code == 401:
return JSONResponse(
status_code=401,
content={"error": "invalid_credentials", "msg": exc.detail},
)
return JSONResponse(
status_code=exc.status_code,
content={"error": "unknown", "msg": exc.detail},
)
access_token = token_data["access_token"]
expires_in = token_data.get("expires_in", 300)
refresh_token = token_data.get("refresh_token")
refresh_expires_in = token_data.get("refresh_expires_in", 1800)
# Validiere Token um User-Infos zu extrahieren.
# _validate_token gibt {sub, email, name, roles} zurück, wobei name
# bereits auf preferred_username normalisiert wurde (siehe auth.py L144).
user_payload = await _validate_token(access_token)
user_info = {}
if user_payload:
user_info = {
"sub": user_payload.get("sub"),
"preferred_username": user_payload.get("name", username),
"name": user_payload.get("name", username),
"email": user_payload.get("email", ""),
}
response = JSONResponse({
"authenticated": True,
"expires_in": expires_in,
"user": user_info,
})
response.set_cookie(
"access_token",
access_token,
max_age=expires_in,
httponly=True,
secure=True,
samesite="lax",
path="/",
)
if refresh_token:
response.set_cookie(
"rt",
refresh_token,
max_age=refresh_expires_in,
httponly=True,
secure=True,
samesite="lax",
path="/api/auth/refresh",
)
return response
@app.post("/api/auth/logout")
async def auth_logout():
"""Logout — löscht access_token + rt-Cookies (HttpOnly, daher server-seitig)."""
response = JSONResponse({"authenticated": False})
response.delete_cookie("access_token", path="/")
response.delete_cookie("rt", path="/api/auth/refresh")
return response
@app.post("/api/auth/refresh")
async def auth_refresh(request: Request):
"""Refresh-Token-Endpoint (#129) — holt neuen access_token via refresh_token-Cookie.
Setzt neuen access_token-Cookie bei Erfolg.
"""
from .auth import _keycloak_issuer
refresh_token = request.cookies.get("rt")
if not refresh_token:
raise HTTPException(status_code=401, detail="Kein Refresh-Token")
import httpx as _httpx
async with _httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{_keycloak_issuer()}/protocol/openid-connect/token",
data={
"grant_type": "refresh_token",
"client_id": settings.keycloak_client_id,
"refresh_token": refresh_token,
},
)
if resp.status_code != 200:
raise HTTPException(status_code=401, detail="Refresh fehlgeschlagen")
token_data = resp.json()
access_token = token_data["access_token"]
expires_in = token_data.get("expires_in", 300)
response = JSONResponse({"authenticated": True, "expires_in": expires_in})
response.set_cookie(
"access_token",
access_token,
max_age=expires_in,
httponly=True,
secure=True,
samesite="lax",
path="/",
)
return response
# ─── Bookmarks + Comments (#94) ─────────────────────────────────────────────
@app.post("/api/bookmark")
async def bookmark_toggle(
drucksache: str = Form(...),
user: dict = Depends(require_auth),
):
"""Toggle bookmark für einen Antrag."""
is_bookmarked = await toggle_bookmark(user["sub"], drucksache)
return {"bookmarked": is_bookmarked, "drucksache": drucksache}
@app.get("/api/bookmarks")
async def bookmarks_list(user=Depends(get_current_user)):
"""Liste aller Bookmarks des aktuellen Users."""
if not user:
return []
return await get_bookmarks(user["sub"])
# ─── Merkliste — serverseitig (#140) ─────────────────────────────────────────
class _MerklisteAddBody(BaseModel):
antrag_id: str
notiz: Optional[str] = None
class _MerklisteBulkBody(BaseModel):
entries: list[dict]
@app.post("/api/me/merkliste")
async def merkliste_add_endpoint(
body: _MerklisteAddBody,
user: dict = Depends(require_auth),
):
"""Antrag zur Merkliste hinzufügen (Upsert). Erfordert Anmeldung."""
entry = await merkliste_add(user["sub"], body.antrag_id, body.notiz)
return entry
@app.delete("/api/me/merkliste/{antrag_id:path}")
async def merkliste_remove_endpoint(
antrag_id: str,
user: dict = Depends(require_auth),
):
"""Antrag aus der Merkliste entfernen. Erfordert Anmeldung."""
removed = await merkliste_remove(user["sub"], antrag_id)
return {"removed": removed, "antrag_id": antrag_id}
@app.get("/api/me/merkliste")
async def merkliste_list_endpoint(user: dict = Depends(require_auth)):
"""Alle Merklisten-Einträge des aktuellen Users. Erfordert Anmeldung."""
return await merkliste_list(user["sub"])
@app.post("/api/me/merkliste/bulk-import")
async def merkliste_bulk_import(
body: _MerklisteBulkBody,
user: dict = Depends(require_auth),
):
"""Mehrere Einträge auf einmal importieren (für localStorage-Migration).
Body: ``{"entries": [{"antrag_id": "18/1234"}, …]}``
"""
count = await merkliste_bulk_add(user["sub"], body.entries)
return {"imported": count}
# ─── E-Mail-Abonnements (#124) ───────────────────────────────────────────────
@app.post("/api/subscriptions")
async def subscription_create(
bundesland: Optional[str] = Form(None),
partei: Optional[str] = Form(None),
frequency: str = Form("daily"),
user: dict = Depends(require_auth),
):
"""Neues Abo für Benachrichtigungen anlegen."""
email = user.get("email")
if not email:
raise HTTPException(status_code=400, detail="Nutzer hat keine E-Mail-Adresse im Token")
if frequency not in ("daily",):
raise HTTPException(status_code=400, detail="Unsupported frequency")
sub_id = await create_subscription(
user_id=user["sub"],
email=email,
bundesland=bundesland or None,
partei=partei or None,
frequency=frequency,
)
return {"id": sub_id, "email": email, "bundesland": bundesland, "partei": partei, "frequency": frequency}
@app.get("/api/subscriptions")
async def subscription_list(user=Depends(get_current_user)):
"""Liste aller Abos. Admins erhalten alle Abos inkl. user_id-Feld; normale
User sehen nur ihre eigenen Abos."""
if not user:
return []
roles = user.get("roles") or []
if "admin" in roles:
return await list_all_subscriptions()
return await list_subscriptions(user["sub"])
@app.delete("/api/subscriptions/{sub_id}")
async def subscription_delete(sub_id: int, user: dict = Depends(require_auth)):
"""Abo löschen (nur eigenes)."""
ok = await delete_subscription(user["sub"], sub_id)
if not ok:
raise HTTPException(status_code=404, detail="Abo nicht gefunden")
return {"deleted": sub_id}
@app.get("/unsubscribe/{sub_id}/{token}", response_class=HTMLResponse)
async def unsubscribe(sub_id: int, token: str):
"""Unsubscribe-Link aus E-Mails — kein Login nötig (HMAC-Token verifiziert)."""
from .mail import verify_unsubscribe_token
if not verify_unsubscribe_token(sub_id, token):
return HTMLResponse(
"<html><body style='font-family:sans-serif;max-width:500px;margin:50px auto;padding:20px'>"
"<h2>Ungültiger Unsubscribe-Link</h2>"
"<p>Der Link ist nicht mehr gültig oder wurde manipuliert.</p>"
"<p><a href='/'>Zurück zur Startseite</a></p>"
"</body></html>",
status_code=400,
)
ok = await delete_subscription_by_id(sub_id)
msg = "Abo wurde abbestellt." if ok else "Abo war bereits gelöscht."
return HTMLResponse(
f"<html><body style='font-family:sans-serif;max-width:500px;margin:50px auto;padding:20px'>"
f"<h2>{msg}</h2>"
f"<p>Du bekommst keine weiteren Benachrichtigungen zu diesem Filter.</p>"
f"<p><a href='/'>Zurück zur Startseite</a></p>"
f"</body></html>"
)
@app.post("/api/comment")
async def comment_add(
drucksache: str = Form(...),
text: str = Form(...),
visibility: str = Form("all"),
user: dict = Depends(require_auth),
):
"""Kommentar hinzufügen."""
if len(text) > 2000:
raise HTTPException(status_code=400, detail="Kommentar zu lang (max 2000 Zeichen)")
return await add_comment(user["sub"], user.get("name", ""), drucksache, text, visibility)
@app.get("/api/comments")
async def comments_list(drucksache: str, user=Depends(get_current_user)):
"""Kommentare für einen Antrag."""
user_id = user["sub"] if user else None
return await get_comments(drucksache, user_id)
@app.delete("/api/comment/{comment_id}")
async def comment_delete(comment_id: int, user: dict = Depends(require_auth)):
"""Eigenen Kommentar löschen."""
deleted = await delete_comment(comment_id, user["sub"])
if not deleted:
raise HTTPException(status_code=404, detail="Kommentar nicht gefunden oder nicht Ihr Kommentar")
return {"status": "deleted"}
# ─── Assessment-History (#110) ────────────────────────────────────────────
@app.get("/api/assessment/history")
async def assessment_history(drucksache: str):
"""Versionshistorie eines Assessments."""
drucksache = validate_drucksache(drucksache)
return await get_assessment_history(drucksache)
# ─── Crowd-Validation / Votes (#112) ─────────────────────────────────────
@app.post("/api/vote")
async def vote_endpoint(
drucksache: str = Form(...),
target: str = Form("overall"),
vote: str = Form(...),
user: dict = Depends(require_auth),
):
"""Bewertung als treffend/fragwürdig markieren. Toggle: gleicher Vote nochmal = entfernen."""
drucksache = validate_drucksache(drucksache)
if vote not in ("up", "down"):
raise HTTPException(status_code=400, detail="vote muss 'up' oder 'down' sein")
if target not in ("overall",) and not target.startswith("matrix:") and not target.startswith("partei:"):
raise HTTPException(status_code=400, detail="Ungültiges target")
result = await upsert_vote(user["sub"], drucksache, target, vote)
return result
@app.get("/api/votes")
async def votes_endpoint(drucksache: str, user: dict = Depends(get_current_user)):
"""Aggregierte Votes + eigener Vote für eine Drucksache."""
drucksache = validate_drucksache(drucksache)
user_id = user["sub"] if user else None
return await get_votes(drucksache, user_id)
# ─── Registrierung (#103) ────────────────────────────────────────────────
@app.post("/api/auth/register")
@limiter.limit("3/hour")
async def auth_register(
request: Request,
firstName: str = Form(...),
lastName: str = Form(...),
email: str = Form(...),
username: str = Form(...),
):
"""Registrierung: erstellt User in Keycloak mit enabled=false.
Admin muss den Account manuell freischalten.
Kein Passwort nötig — nach Freischaltung sendet Keycloak eine
E-Mail zum Passwort-Setzen."""
import httpx as _httpx
admin_token = await keycloak_admin_token()
async with _httpx.AsyncClient(timeout=10) as client:
# User anlegen (disabled, ohne Passwort, mit required action)
create_resp = await client.post(
f"{settings.keycloak_url}/admin/realms/{settings.keycloak_realm}/users",
headers={"Authorization": f"Bearer {admin_token}", "Content-Type": "application/json"},
json={
"username": username,
"email": email,
"firstName": firstName,
"lastName": lastName,
"enabled": False,
"emailVerified": True,
"requiredActions": ["UPDATE_PASSWORD"],
},
)
if create_resp.status_code == 409:
raise HTTPException(status_code=409, detail="Benutzername oder E-Mail bereits vergeben")
if create_resp.status_code != 201:
raise HTTPException(status_code=500, detail="Registrierung fehlgeschlagen")
# #143: Bestätigungsmail an User direkt nach Anmeldung
try:
from .mail import send_mail
anrede = f"{firstName} {lastName}".strip() or username
text_body = (
f"Hallo {anrede},\n\n"
f"deine Registrierung am GWÖ-Antragsprüfer ist eingegangen.\n\n"
f"Was passiert jetzt?\n"
f" 1. Ein Admin schaltet deinen Account manuell frei.\n"
f" 2. Du erhältst dann eine separate Mail mit einem Link zum Passwort-Setzen.\n"
f" 3. Anschließend kannst du dich auf https://gwoe.toppyr.de/ anmelden.\n\n"
f"Falls du nach 1-2 Werktagen nichts hörst, melde dich gerne unter mail@tobiasroedel.de.\n\n"
f"Schöne Grüße\nGWÖ-Antragsprüfer"
)
html_body = (
f"<p>Hallo <strong>{anrede}</strong>,</p>"
f"<p>deine Registrierung am <a href=\"https://gwoe.toppyr.de/\">GWÖ-Antragsprüfer</a> ist eingegangen.</p>"
f"<p><strong>Was passiert jetzt?</strong></p>"
f"<ol>"
f"<li>Ein Admin schaltet deinen Account manuell frei.</li>"
f"<li>Du erhältst dann eine separate Mail mit einem Link zum Passwort-Setzen.</li>"
f"<li>Anschließend kannst du dich auf <a href=\"https://gwoe.toppyr.de/\">gwoe.toppyr.de</a> anmelden.</li>"
f"</ol>"
f"<p>Falls du nach 1-2 Werktagen nichts hörst, melde dich gerne unter <a href=\"mailto:mail@tobiasroedel.de\">mail@tobiasroedel.de</a>.</p>"
f"<p style=\"color:#666;font-size:0.9em\">Schöne Grüße<br>GWÖ-Antragsprüfer</p>"
)
await send_mail(email, "GWÖ-Antragsprüfer — Registrierung eingegangen", text_body, html_body)
except Exception:
logger.exception("Bestätigungsmail an %s fehlgeschlagen — User-Anlage war aber erfolgreich", email)
return {"status": "pending_approval", "message": "Registrierung eingegangen. Wir haben dir eine Bestätigung per E-Mail geschickt."}
@app.get("/api/auth/pending-users")
async def auth_pending_users(user: dict = Depends(require_admin)):
"""Liste nicht-freigeschalteter User (Admin-only)."""
import httpx as _httpx
admin_token = await keycloak_admin_token()
async with _httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{settings.keycloak_url}/admin/realms/{settings.keycloak_realm}/users?enabled=false&max=50",
headers={"Authorization": f"Bearer {admin_token}"},
)
users = resp.json() if resp.status_code == 200 else []
return [{"id": u["id"], "username": u.get("username"),
"firstName": u.get("firstName"), "lastName": u.get("lastName"),
"email": u.get("email"), "created": u.get("createdTimestamp")}
for u in users]
@app.post("/api/auth/approve-user")
async def auth_approve_user(
user_id: str = Form(...),
user: dict = Depends(require_admin),
):
"""User freischalten (Admin-only).
Aktiviert den User und triggert eine Keycloak-E-Mail zum Passwort setzen.
Voraussetzung: Keycloak Realm hat SMTP konfiguriert (Realm Settings → Email).
"""
import httpx as _httpx
admin_token = await keycloak_admin_token()
async with _httpx.AsyncClient(timeout=10) as client:
headers = {"Authorization": f"Bearer {admin_token}", "Content-Type": "application/json"}
base = f"{settings.keycloak_url}/admin/realms/{settings.keycloak_realm}/users/{user_id}"
# 1. User aktivieren
resp = await client.put(base, headers=headers, json={"enabled": True})
if resp.status_code != 204:
raise HTTPException(status_code=500, detail="Freischaltung fehlgeschlagen")
# 2. Passwort-Setzen-E-Mail senden (Keycloak execute-actions-email)
email_resp = await client.put(
f"{base}/execute-actions-email", headers=headers, json=["UPDATE_PASSWORD"],
)
email_sent = email_resp.status_code == 204
return {
"status": "approved",
"user_id": user_id,
"password_email_sent": email_sent,
}
# API: Load assessments from database
@app.get("/api/assessments")
async def list_assessments(bundesland: Optional[str] = None):
"""Return assessments from database, optionally filtered by Bundesland.
``bundesland="ALL"`` and missing parameter both mean "no filter".
"""
rows = await get_all_assessments(bundesland)
# Lightweight list format — nur Felder die für die Listenansicht nötig sind.
# Detail-Daten (Matrix, Scores, Verbesserungen) werden on-demand via
# GET /api/assessment?drucksache=X geladen (#122 Speicheroptimierung).
assessments = []
for row in rows:
assessments.append({
"drucksache": row.get("drucksache"),
"title": row.get("title"),
"fraktionen": row.get("fraktionen", []),
"datum": row.get("datum"),
"link": row.get("link"),
"bundesland": row.get("bundesland"),
"gwoeScore": row.get("gwoe_score"),
"empfehlung": row.get("empfehlung"),
"empfehlungSymbol": row.get("empfehlung_symbol"),
"themen": row.get("themen", []),
"updatedAt": row.get("updated_at"),
"konfidenz": row.get("konfidenz"),
})
return assessments
# API: Get single assessment (use query param for drucksache with /)
@app.get("/api/assessment")
async def get_single_assessment(drucksache: str):
"""Get a single assessment by drucksache ID."""
drucksache = validate_drucksache(drucksache)
row = await get_assessment(drucksache)
if not row:
raise HTTPException(status_code=404, detail="Assessment nicht gefunden")
return {
"drucksache": row.get("drucksache"),
"title": row.get("title"),
"fraktionen": row.get("fraktionen", []),
"datum": row.get("datum"),
"link": row.get("link"),
"bundesland": row.get("bundesland"),
"gwoeScore": row.get("gwoe_score"),
"gwoeBegründung": row.get("gwoe_begruendung"),
"gwoeMatrix": row.get("gwoe_matrix", []),
"gwoeSchwerpunkt": row.get("gwoe_schwerpunkt", []),
"wahlprogrammScores": row.get("wahlprogramm_scores", []),
"verbesserungen": row.get("verbesserungen", []),
"stärken": row.get("staerken", []),
"schwächen": row.get("schwaechen", []),
"empfehlung": row.get("empfehlung"),
"empfehlungSymbol": row.get("empfehlung_symbol"),
"verbesserungspotenzial": row.get("verbesserungspotenzial"),
"themen": row.get("themen", []),
"antragZusammenfassung": row.get("antrag_zusammenfassung"),
"antragKernpunkte": row.get("antrag_kernpunkte", []),
"updatedAt": row.get("updated_at"),
"source": row.get("source"),
"model": row.get("model"),
"konfidenz": row.get("konfidenz"),
"fehlendeProgramme": row.get("fehlende_programme") or [],
}
# API: Delete assessment for re-analysis (#97)
@app.delete("/api/assessment/delete")
async def delete_assessment_endpoint(
drucksache: str,
user: dict = Depends(require_admin),
):
"""Löscht ein Assessment, damit es neu analysiert werden kann."""
drucksache = validate_drucksache(drucksache)
deleted = await delete_assessment(drucksache)
if not deleted:
raise HTTPException(status_code=404, detail="Assessment nicht gefunden")
return {"status": "deleted", "drucksache": drucksache}
# API: Generate PDF on demand for an assessment
@app.get("/api/assessment/pdf")
async def download_assessment_pdf(drucksache: str):
"""Generate and download PDF for an assessment."""
from .models import Assessment
drucksache = validate_drucksache(drucksache)
row = await get_assessment(drucksache)
if not row:
raise HTTPException(status_code=404, detail="Assessment nicht gefunden")
# Check if PDF already exists
safe_name = drucksache.replace("/", "-")
pdf_path = settings.reports_dir / f"{safe_name}.pdf"
if not pdf_path.exists():
# Convert DB row to Assessment model for report generation
assessment_data = {
"drucksache": row.get("drucksache"),
"title": row.get("title"),
"fraktionen": row.get("fraktionen", []),
"datum": row.get("datum"),
"link": row.get("link"),
"gwoe_score": row.get("gwoe_score") or 0,
"gwoe_begruendung": row.get("gwoe_begruendung") or "",
"gwoe_matrix": row.get("gwoe_matrix", []),
"gwoe_schwerpunkt": row.get("gwoe_schwerpunkt", []),
"wahlprogramm_scores": row.get("wahlprogramm_scores", []),
"verbesserungen": row.get("verbesserungen", []),
"staerken": row.get("staerken", []),
"schwaechen": row.get("schwaechen", []),
"empfehlung": row.get("empfehlung") or "",
"empfehlung_symbol": row.get("empfehlung_symbol") or "",
"verbesserungspotenzial": row.get("verbesserungspotenzial") or "",
"themen": row.get("themen", []),
"antrag_zusammenfassung": row.get("antrag_zusammenfassung") or "",
"antrag_kernpunkte": row.get("antrag_kernpunkte", []),
}
try:
assessment = Assessment(**assessment_data)
await generate_pdf_report(
assessment,
pdf_path,
bundesland=row.get("bundesland"),
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"PDF-Generierung fehlgeschlagen: {e}")
return FileResponse(
pdf_path,
media_type="application/pdf",
filename=f"gwoe-{safe_name}.pdf"
)
# API: Search internal DB only
@app.get("/api/search")
async def search_internal(
q: str,
bundesland: str = "NRW",
limit: int = 50
):
"""
Search internal assessments database only.
"""
q = validate_search_query(q)
db_results = await search_assessments(q, bundesland, limit)
results = []
for row in db_results:
results.append({
"drucksache": row.get("drucksache"),
"title": row.get("title"),
"fraktionen": row.get("fraktionen", []),
"datum": row.get("datum"),
"link": row.get("link"),
"bundesland": bundesland,
"gwoeScore": row.get("gwoe_score"),
"themen": row.get("themen", []),
"status": "checked",
})
return results
# API: Search external parliament portal (Landtag)
@app.get("/api/search-landtag")
async def search_landtag(
q: str,
bundesland: str = "NRW",
limit: int = 20
):
"""
Search external parliament portal (e.g., NRW OPAL).
Returns results that can be analyzed with "Jetzt prüfen".
Requires a concrete Bundesland — the special "ALL" / Bundesweit mode
cannot pick a single Landtag adapter and is rejected with HTTP 400.
"""
q = validate_search_query(q)
if not bundesland or bundesland == "ALL":
raise HTTPException(
status_code=400,
detail="Landtag-Suche benötigt ein konkretes Bundesland",
)
adapter = get_adapter(bundesland)
if not adapter:
return {"error": f"Bundesland {bundesland} noch nicht unterstützt"}
try:
external = adapter._filter_abstimmbar(await adapter.search(q, limit))
# Zusätzliche Title-Heuristik: bei Adaptern die Typ='Drucksache' liefern
# (NRW), Kleine-Anfrage-Frage-Pattern erkennen und ausfiltern.
from .drucksache_typen import likely_kleine_anfrage_titel, KLEINE_ANFRAGE
results = []
for doc in external:
if doc.typ_normiert == "sonstige" and likely_kleine_anfrage_titel(doc.title):
continue # höchstwahrscheinlich Kleine Anfrage
results.append({
"drucksache": doc.drucksache,
"title": doc.title,
"fraktionen": doc.fraktionen,
"datum": doc.datum,
"link": doc.link,
"bundesland": bundesland,
"typ": doc.typ,
"typ_normiert": doc.typ_normiert,
"gwoeScore": None,
"status": "unchecked",
})
return results
except Exception as e:
logger.exception("Landtag search error for q=%r bundesland=%s", q, bundesland)
return {"error": f"Suchfehler: {str(e)}"}
# API: Batch-Analyse (#44) — enqueued ungeprüfte Drucksachen eines BL
@app.post("/api/batch-analyze")
@limiter.limit("3/minute")
async def batch_analyze(
request: Request,
bundesland: str = Form(...),
limit: int = Form(10),
user: dict = Depends(require_admin),
):
"""Sucht die neuesten Drucksachen im Landtag-Portal und enqueued
alle, die noch nicht in der DB bewertet sind.
Returns: Liste der enqueued Drucksachen + Queue-Position.
"""
from .queue import enqueue, QueueFullError
if limit < 1 or limit > 100:
raise HTTPException(status_code=400, detail="limit muss 1-100 sein")
adapter = get_adapter(bundesland)
if not adapter:
raise HTTPException(status_code=400, detail=f"Bundesland {bundesland} nicht unterstützt")
# Neueste Drucksachen vom Landtag holen, gefiltert auf abstimmbare Typen (#127).
drucksachen = adapter._filter_abstimmbar(await adapter.search("", limit=limit * 10))
enqueued = []
skipped = 0
for doc in drucksachen:
if len(enqueued) >= limit:
break
# Schon bewertet?
existing = await get_assessment(doc.drucksache)
if existing:
skipped += 1
continue
# Text herunterladen
text = await adapter.download_text(doc.drucksache)
if not text:
continue
# Enqueue
job_id = str(uuid.uuid4())
await create_job(job_id, text[:500], bundesland, "qwen-plus", drucksache=doc.drucksache)
try:
position = await enqueue(
job_id,
run_drucksache_analysis,
job_id, doc.drucksache, text, bundesland, "qwen-plus", doc,
drucksache=doc.drucksache,
)
enqueued.append({
"drucksache": doc.drucksache,
"title": doc.title,
"job_id": job_id,
"queue_position": position,
})
except QueueFullError:
break
return {
"status": "batch_enqueued",
"bundesland": bundesland,
"enqueued": len(enqueued),
"skipped_existing": skipped,
"jobs": enqueued,
}
# API: Analyze a document from parliament portal
@app.post("/api/analyze-drucksache")
@limiter.limit("10/minute")
async def analyze_drucksache(
request: Request,
background_tasks: BackgroundTasks,
drucksache: str = Form(...),
bundesland: str = Form("NRW"),
model: str = Form("qwen-plus"),
user: dict = Depends(require_auth),
):
"""
Download a document from parliament portal and analyze it.
"""
drucksache = validate_drucksache(drucksache)
# Check if already analyzed
existing = await get_assessment(drucksache)
if existing:
return {"status": "already_checked", "drucksache": drucksache}
# Get adapter and download
adapter = get_adapter(bundesland)
if not adapter:
raise HTTPException(status_code=400, detail=f"Bundesland {bundesland} nicht unterstützt")
# Download text
text = await adapter.download_text(drucksache)
if not text:
raise HTTPException(status_code=404, detail=f"Dokument {drucksache} nicht gefunden")
# Get document metadata
doc = await adapter.get_document(drucksache)
# #127: Typ-Check — nur abstimmbare Drucksachen analysieren.
# Falls der Adapter den Typ nicht richtig setzt (NRW: "Drucksache"),
# versuche den Typ aus dem Dokument-Text zu erkennen.
from .drucksache_typen import ist_abstimmbar_original, normalize_typ
doc_typ = doc.typ if doc else "Drucksache"
if doc_typ in ("Drucksache", ""):
for kw in ["Kleine Anfrage", "Große Anfrage", "Gesetzentwurf",
"Änderungsantrag", "Entschließungsantrag",
"Dringlichkeitsantrag", "Beschlussempfehlung",
"Unterrichtung", "Antrag"]:
if kw in text[:500]:
doc_typ = kw
break
if not ist_abstimmbar_original(doc_typ):
return {
"status": "skipped",
"drucksache": drucksache,
"typ": doc_typ,
"typ_normiert": normalize_typ(doc_typ),
"reason": f"Typ '{doc_typ}' ist nicht abstimmbar — keine GWÖ-Bewertung sinnvoll",
}
# Create job and enqueue (#95)
from .queue import enqueue, QueueFullError
job_id = str(uuid.uuid4())
await create_job(job_id, text[:500], bundesland, model, drucksache=drucksache)
try:
position = await enqueue(
job_id,
run_drucksache_analysis,
job_id, drucksache, text, bundesland, model, doc,
drucksache=drucksache,
)
except QueueFullError:
await update_job(job_id, status="rejected", error="Queue voll")
raise HTTPException(
status_code=429,
detail="Analyse-Queue ist voll. Bitte später erneut versuchen.",
headers={"Retry-After": "60"},
)
return {
"status": "queued",
"job_id": job_id,
"drucksache": drucksache,
"queue_position": position,
}
async def run_drucksache_analysis(
job_id: str,
drucksache: str,
text: str,
bundesland: str,
model: str,
doc
):
"""Background task for drucksache analysis."""
try:
await update_job(job_id, status="processing")
# Run LLM analysis
assessment = await analyze_antrag(text, bundesland, model)
# Prepare data for DB
assessment_data = {
"drucksache": drucksache,
# Titel-Priorität: LLM-generierter Titel > doc.title,
# ABER nur wenn doc.title ein echter Titel ist (nicht "Drucksache XX",
# wie NRW's get_document zurückgibt). Sonst überschreibt der
# generische doc.title den echten LLM-Titel.
"title": _pick_best_title(assessment.title, doc.title if doc else None, drucksache),
"fraktionen": assessment.fraktionen,
"datum": assessment.datum or (doc.datum if doc else ""),
"link": doc.link if doc else "",
"bundesland": bundesland,
"gwoeScore": assessment.gwoe_score,
"gwoeBegründung": assessment.gwoe_begruendung,
"gwoeMatrix": [m.model_dump() for m in assessment.gwoe_matrix],
"gwoeSchwerpunkt": assessment.gwoe_schwerpunkt,
"wahlprogrammScores": [w.model_dump() for w in assessment.wahlprogramm_scores],
"verbesserungen": [v.model_dump() for v in assessment.verbesserungen],
"stärken": assessment.staerken,
"schwächen": assessment.schwaechen,
"empfehlung": assessment.empfehlung,
"empfehlungSymbol": assessment.empfehlung_symbol,
"verbesserungspotenzial": assessment.verbesserungspotenzial,
"themen": assessment.themen,
"antragZusammenfassung": assessment.antrag_zusammenfassung,
"antragKernpunkte": assessment.antrag_kernpunkte,
"konfidenz": getattr(assessment, 'konfidenz', None),
"share_threads": getattr(assessment, 'share_threads', None),
"share_twitter": getattr(assessment, 'share_twitter', None),
"share_mastodon": getattr(assessment, 'share_mastodon', None),
"fehlendeProgramme": getattr(assessment, 'fehlende_programme', None) or [],
"source": "webapp",
"model": model,
}
# #123: Assessment-Embedding für Clustering/Ähnlichkeit erzeugen.
# Fällt bei API-Fehler auf (None, None) zurück — Backfill-Script
# zieht nach. Kein Show-Stopper für das Assessment selbst.
from .embeddings import create_assessment_embedding
emb_blob, emb_model = create_assessment_embedding(
title=assessment_data["title"],
zusammenfassung=assessment.antrag_zusammenfassung,
themen=assessment.themen,
bundesland=bundesland,
)
assessment_data["summary_embedding"] = emb_blob
assessment_data["embedding_model"] = emb_model
# #133: Social-Media-Texte separat generieren (schneller, zuverlässiger
# als sie im Haupt-Prompt unterzubringen — Qwen ignoriert sie dort)
try:
from openai import OpenAI
social_client = OpenAI(
api_key=settings.dashscope_api_key,
base_url=settings.dashscope_base_url,
)
social_prompt = (
f"Generiere Social-Media-Posts für diesen Parlamentsantrag:\n\n"
f"Titel: {assessment_data['title']}\n"
f"GWÖ-Score: {assessment_data['gwoeScore']}/10\n"
f"Empfehlung: {assessment_data.get('empfehlung', '')}\n"
f"Fraktionen: {', '.join(assessment_data.get('fraktionen', []))}\n"
f"Themen: {', '.join(assessment_data.get('themen', []))}\n"
f"Bundesland: {bundesland}\n"
f"Zusammenfassung: {(assessment.antrag_zusammenfassung or '')[:300]}\n\n"
f"Antworte NUR mit JSON:\n"
f'{{"shareThreads": "Post für Threads/Instagram (max 500 Zeichen, Emojis, konkret auf den Antrag, CTA, Hashtags #Gemeinwohl #GWÖ + 2 thematische)",'
f' "shareTwitter": "Tweet für X/Twitter (max 280 Zeichen, knackig, pointiert, 2 Hashtags)",'
f' "shareMastodon": "Post für Mastodon (max 500 Zeichen, sachlich-informativ, Kontext, Quellenhinweis)"}}'
)
social_resp = social_client.chat.completions.create(
model="qwen-plus-latest",
messages=[{"role": "user", "content": social_prompt}],
temperature=0.7,
response_format={"type": "json_object"},
)
import json as _json
social_data = _json.loads(social_resp.choices[0].message.content)
assessment_data["share_threads"] = social_data.get("shareThreads", "")
assessment_data["share_twitter"] = social_data.get("shareTwitter", "")
assessment_data["share_mastodon"] = social_data.get("shareMastodon", "")
logger.info("Social-Texte generiert für %s", drucksache)
except Exception:
logger.exception("Social-Text-Generierung fehlgeschlagen für %s", drucksache)
# Save to DB
await upsert_assessment(assessment_data)
# Generate reports
html_path = settings.reports_dir / f"{job_id}.html"
pdf_path = settings.reports_dir / f"{job_id}.pdf"
await generate_html_report(assessment, html_path, bundesland=bundesland)
await generate_pdf_report(assessment, pdf_path, bundesland=bundesland)
await update_job(
job_id,
status="completed",
result=assessment.model_dump_json(),
html_path=str(html_path),
pdf_path=str(pdf_path),
)
except Exception as e:
# Volltext-Stack via logger.exception, NICHT via print — landet so im
# strukturierten Container-Log und wird vom logging-Framework formatiert
logger.exception("run_drucksache_analysis failed for drucksache=%s", drucksache)
await update_job(job_id, status="failed", error=str(e))
# API: List available Bundesländer
@app.get("/api/bundeslaender")
async def list_bundeslaender():
"""List available bundesländer with their status.
Includes the synthetic "ALL" / Bundesweit entry as the first item so
that the frontend can render it directly. ``parlament_name`` is added
so the detail view can show the source parliament without an extra
backend round-trip.
"""
out = [{
"code": "ALL",
"name": "🌍 Bundesweit",
"parlament_name": None,
"active": True,
}]
out.extend({
"code": bl.code,
"name": bl.name,
"parlament_name": bl.parlament_name,
"active": bl.aktiv,
} for bl in alle_bundeslaender())
return out
# === Impressum / Datenschutz ===
@app.get("/impressum", response_class=HTMLResponse)
async def impressum_page(request: Request, current_user: Optional[dict] = Depends(get_current_user)):
return templates.TemplateResponse("v2/screens/legal.html", {
"request": request, "app_name": settings.app_name,
"title": "Impressum", "section": "impressum",
**_v2_template_context(current_user),
})
@app.get("/datenschutz", response_class=HTMLResponse)
async def datenschutz_page(request: Request, current_user: Optional[dict] = Depends(get_current_user)):
return templates.TemplateResponse("v2/screens/legal.html", {
"request": request, "app_name": settings.app_name,
"title": "Datenschutzerklärung", "section": "datenschutz",
**_v2_template_context(current_user),
})
# === Quellen / Programme ===
@app.get("/methodik", response_class=HTMLResponse)
async def methodik_page(request: Request, current_user: Optional[dict] = Depends(get_current_user)):
"""Transparenz-/Methodik-Seite (#96)."""
from .bundeslaender import aktive_bundeslaender, BUNDESLAENDER
from .embeddings import get_indexing_status
bl_list = []
for bl in aktive_bundeslaender():
bl_list.append({
"code": bl.code,
"name": bl.name,
"doku_system": bl.doku_system,
})
status = get_indexing_status()
return templates.TemplateResponse("v2/screens/methodik.html", {
"request": request,
"app_name": settings.app_name,
"adapter_count": len(ADAPTERS),
"model_name": settings.llm_model_default,
"embedding_model": settings.embedding_model_read,
"programme_count": status.get("total", 0),
"chunk_count": sum(p.get("chunks", 0) for p in status.get("programmes", [])),
"bundeslaender": sorted(bl_list, key=lambda x: x["name"]),
**_v2_template_context(current_user),
})
@app.get("/quellen", response_class=HTMLResponse)
async def quellen_page(request: Request, current_user: Optional[dict] = Depends(get_current_user)):
"""Quellen-Seite mit allen Wahl- und Parteiprogrammen, nach BL gruppiert."""
from .bundeslaender import BUNDESLAENDER
programmes = get_programme_info()
status = get_indexing_status()
# Wahlprogramme nach Bundesland gruppieren
by_bl: dict[str, list] = {}
grundsatz = []
for prog in programmes:
if prog["typ"] == "parteiprogramm":
grundsatz.append(prog)
else:
bl = prog.get("bundesland") or "Sonstige"
bl_name = BUNDESLAENDER[bl].name if bl in BUNDESLAENDER else bl
by_bl.setdefault(bl_name, []).append(prog)
# Sortieren: alphabetisch nach BL-Name
wahlprogramme_grouped = sorted(by_bl.items())
return templates.TemplateResponse("v2/screens/quellen.html", {
"request": request,
"app_name": settings.app_name,
"programmes": programmes,
"wahlprogramme_grouped": wahlprogramme_grouped,
"grundsatzprogramme": grundsatz,
"status": status,
**_v2_template_context(current_user),
})
@app.get("/api/wahlprogramm-cite")
async def wahlprogramm_cite(
request: Request,
background_tasks: BackgroundTasks,
pid: str = "", pdf: str = "", seite: int = 1, q: str = "",
ds: str = "", bl: str = "",
):
"""Render eine Wahlprogramm-Seite mit gelb hervorgehobener Zitat-Stelle.
Issue #47: Klick auf eine Zitat-Quelle im Report soll direkt zur
Stelle im Wahlprogramm-PDF springen, mit dem zitierten Snippet
visuell markiert. Statt das ganze PDF auszuliefern (Browser scrollt
auf #page=N und Leser muss von Hand suchen), liefern wir hier ein
1-Seiten-PDF mit ``add_highlight_annot``-Annotation auf den per
``page.search_for`` gefundenen Bounding-Boxes.
Akzeptiert ``pid`` (PROGRAMME-Key) ODER ``pdf`` (Dateiname wie
``spd-grundsatzprogramm.pdf``). Letzterer ermöglicht die retroaktive
Nutzung von Pre-#47-URLs im Frontend, wo nur der statische Pfad
``/static/referenzen/<pdf>#page=<N>`` gespeichert ist.
Security: ``pid`` muss ein registrierter PROGRAMME-Key sein —
verhindert Path-Traversal und arbiträren File-Read aus dem
referenzen-Verzeichnis. ``seite`` wird per Pydantic-Coercion
auf int gezwungen. ``q`` ist auf 200 Zeichen begrenzt im Renderer.
"""
# Reverse-Lookup: pdf-Filename → programm_id, falls nur pdf angegeben.
# Zwei Stufen: exakter Match, dann fuzzy (Year-Suffix-Stripping), weil
# Pre-#47 Assessments halluzinierte Dateinamen haben können, z.B.
# "gruene-grundsatzprogramm-2020.pdf" statt "gruene-grundsatzprogramm.pdf".
if not pid and pdf:
# Stage 1: exakt
for p, info in PROGRAMME.items():
if info.get("pdf") == pdf:
pid = p
break
# Stage 2: Year-Suffix stripping (z.B. "X-2020.pdf" → "X.pdf")
if not pid:
import re
stripped = re.sub(r"-\d{4}\.pdf$", ".pdf", pdf)
if stripped != pdf:
for p, info in PROGRAMME.items():
if info.get("pdf") == stripped:
pid = p
break
if pid not in PROGRAMME:
raise HTTPException(status_code=404, detail="Unbekanntes Wahlprogramm")
if seite < 1 or seite > 2000:
raise HTTPException(status_code=400, detail="Ungültige Seitennummer")
pdf_bytes, found_page, highlighted = render_highlighted_page(pid, seite, q)
if pdf_bytes is None:
raise HTTPException(
status_code=404,
detail="Wahlprogramm-PDF oder Seite nicht verfügbar",
)
# Issue #47: Wenn das Zitat nicht im PDF auffindbar ist UND wir die
# Drucksache kennen, ist das Assessment wahrscheinlich ein Pre-#60-
# Halluzinations-Opfer. Automatische Re-Analyse triggern und dem
# User eine Warte-Seite zeigen statt ein PDF ohne Highlights.
if not highlighted and q and ds and bl:
existing = await get_assessment(ds)
if existing:
adapter = get_adapter(bl)
if adapter:
# Altes Assessment löschen und neu analysieren
await delete_assessment(ds)
job_id = str(uuid.uuid4())
await create_job(job_id, f"Re-Analyse {ds} (Zitat nicht verifizierbar)", bl, "qwen-plus")
text = await adapter.download_text(ds)
if text:
doc = await adapter.get_document(ds)
background_tasks.add_task(
run_drucksache_analysis,
job_id, ds, text, bl, "qwen-plus", doc,
)
# HTML-Warte-Seite mit Auto-Redirect zurück zum Assessment
return HTMLResponse(f"""<!DOCTYPE html>
<html><head><meta charset="utf-8">
<meta http-equiv="refresh" content="15;url=/#assessment={ds}">
<title>Wird neu analysiert…</title>
<style>body{{font-family:sans-serif;display:flex;justify-content:center;align-items:center;height:100vh;margin:0;background:#f5f5f5}}
.box{{text-align:center;padding:2rem;background:#fff;border-radius:8px;box-shadow:0 2px 8px rgba(0,0,0,.1)}}
.spinner{{width:40px;height:40px;border:4px solid #ddd;border-top:4px solid #009da5;border-radius:50%;animation:spin 1s linear infinite;margin:1rem auto}}
@keyframes spin{{to{{transform:rotate(360deg)}}}}</style></head>
<body><div class="box">
<div class="spinner"></div>
<h2>Zitat nicht verifizierbar</h2>
<p>Der Antrag <strong>{ds}</strong> wird mit der aktuellen Pipeline<br>
neu analysiert, um verifizierte Zitate zu erzeugen.</p>
<p style="color:#666;font-size:0.9rem">Automatische Weiterleitung in 15 Sekunden…</p>
</div></body></html>""")
info = PROGRAMME[pid]
safe_name = info.get("pdf", f"{pid}.pdf")
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={
"Content-Disposition": f'inline; filename="{safe_name}"',
"Cache-Control": "public, max-age=3600",
"X-Found-Page": str(found_page),
},
)
@app.get("/api/programme/thumbnail/{programm_id}")
async def programme_thumbnail(programm_id: str):
"""Thumbnail der ersten Seite eines Wahlprogramm-PDFs (PNG, 200px breit).
Wird auf der Quellen-Seite als Vorschau angezeigt. Cached 24h.
"""
import fitz
if programm_id not in PROGRAMME:
raise HTTPException(status_code=404)
info = PROGRAMME[programm_id]
pdf_path = static_dir / "referenzen" / info["pdf"]
if not pdf_path.exists():
raise HTTPException(status_code=404)
try:
doc = fitz.open(str(pdf_path))
page = doc[0]
# 200px Breite, proportional skaliert
zoom = 200 / page.rect.width
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
png_bytes = pix.tobytes("png")
doc.close()
return Response(
content=png_bytes,
media_type="image/png",
headers={"Cache-Control": "public, max-age=86400"},
)
except Exception:
raise HTTPException(status_code=500)
@app.get("/api/programme")
async def list_programme():
"""List all available programmes."""
return get_programme_info()
@app.get("/api/programme/status")
async def programme_status():
"""Get indexing status of all programmes."""
return get_indexing_status()
@app.post("/api/programme/index")
@limiter.limit("3/minute")
async def index_programme(
request: Request,
background_tasks: BackgroundTasks,
programm_id: str = Form(None),
all_programmes: bool = Form(False),
user: dict = Depends(require_admin),
):
"""Index programme(s) for semantic search."""
pdf_dir = static_dir / "referenzen"
if all_programmes:
# Index sequentially to avoid DB locks
async def index_all_sequential():
for prog_id in PROGRAMME.keys():
try:
index_programm(prog_id, pdf_dir)
except Exception:
logger.exception("Error indexing programme %s", prog_id)
background_tasks.add_task(index_all_sequential)
return {"status": "indexing", "programmes": list(PROGRAMME.keys())}
if programm_id and programm_id in PROGRAMME:
background_tasks.add_task(index_programm, programm_id, pdf_dir)
return {"status": "indexing", "programm_id": programm_id}
raise HTTPException(status_code=400, detail="Ungültiges Programm")
# ─────────────────────────────────────────────────────────────────────────────
# Auswertungen #58 — Bundesland × Partei × Wahlperiode Aggregations-Sicht
# ─────────────────────────────────────────────────────────────────────────────
@app.get("/auswertungen", response_class=HTMLResponse)
async def auswertungen_page(request: Request, current_user: dict = Depends(require_auth)):
"""Auswertungs-Dashboard in v2 (Phase 3 Migration aus Classic). Auth-only."""
from .auswertungen import get_wahlperioden
from .bundeslaender import alle_bundeslaender
wahlperioden = get_wahlperioden()
bl_codes = sorted(bl.code for bl in alle_bundeslaender() if bl.aktiv)
return templates.TemplateResponse("v2/screens/auswertungen.html", {
"request": request,
"app_name": settings.app_name,
"v2_active_nav": "auswertungen",
"wahlperioden": wahlperioden,
"bl_codes": bl_codes,
**_v2_template_context(current_user),
})
@app.get("/api/auswertungen/matrix")
async def auswertungen_matrix(
wahlperiode: Optional[str] = None,
bundesland: Optional[str] = None,
):
"""2D-Matrix Bundesland × Partei mit Anzahl + Ø-GWÖ-Score."""
from .auswertungen import aggregate_matrix
return aggregate_matrix(filter_wp=wahlperiode, filter_bl=bundesland)
@app.get("/api/auswertungen/zeitreihe")
async def auswertungen_zeitreihe(bundesland: str, partei: str):
"""Score-Verlauf einer (BL, Partei)-Kombination über alle WPs."""
from .auswertungen import aggregate_zeitreihe
return aggregate_zeitreihe(bundesland, partei)
@app.get("/api/auswertungen/themen-matrix")
async def auswertungen_themen_matrix(min_count: int = 3, bundesland: Optional[str] = None):
"""Thema × Fraktion Heatmap (#105 Integration in Auswertungen).
Zeigt die Ø-GWÖ-Scores pro Thema und Fraktion. Nur Themen mit
mindestens `min_count` Assessments werden angezeigt. Optional auf
ein Bundesland einschränken.
"""
import json as _json
from collections import Counter, defaultdict
from .parteien import normalize_partei
rows = await get_all_assessments(None)
heatmap: dict[str, dict[str, list]] = defaultdict(lambda: defaultdict(list))
thema_counts: Counter = Counter()
for row in rows:
if bundesland is not None and row.get("bundesland") != bundesland:
continue
fraktionen = row.get("fraktionen") or []
themen = row.get("themen") or []
score = row.get("gwoe_score")
if score is None:
continue
for thema in themen[:3]:
thema_counts[thema] += 1
for frak in fraktionen:
# Normalisiere Fraktion (AfD/AFD, LINKE/DIE LINKE)
norm = normalize_partei(frak) or frak
heatmap[thema][norm].append(score)
# Top-Themen filtern
top_themen = [t for t, n in thema_counts.most_common(20) if n >= min_count]
# Alle vorkommenden Fraktionen sammeln
all_fraktionen = sorted({f for t in top_themen for f in heatmap[t]})
# Matrix bauen
cells = {}
for thema in top_themen:
cells[thema] = {}
for frak in all_fraktionen:
scores = heatmap[thema].get(frak, [])
if scores:
cells[thema][frak] = {
"avg": round(sum(scores) / len(scores), 1),
"n": len(scores),
}
return {
"themen": top_themen,
"fraktionen": all_fraktionen,
"cells": cells,
"total": len(rows),
}
@app.get("/api/auswertungen/export.json")
async def auswertungen_export_json():
"""Open-Data-JSON-Export aller Assessments (#113).
Vollständiger Download aller Bewertungen als JSON mit Metadaten.
Lizenz: CC BY 4.0 (öffentliche parlamentarische Dokumente + KI-Analyse).
"""
from datetime import datetime
rows = await get_all_assessments(None)
assessments = []
for row in rows:
assessments.append({
"drucksache": row.get("drucksache"),
"title": row.get("title"),
"fraktionen": row.get("fraktionen", []),
"datum": row.get("datum"),
"link": row.get("link"),
"bundesland": row.get("bundesland"),
"gwoe_score": row.get("gwoe_score"),
"gwoe_begruendung": row.get("gwoe_begruendung"),
"gwoe_matrix": row.get("gwoe_matrix", []),
"gwoe_schwerpunkt": row.get("gwoe_schwerpunkt", []),
"wahlprogramm_scores": row.get("wahlprogramm_scores", []),
"verbesserungen": row.get("verbesserungen", []),
"staerken": row.get("staerken", []),
"schwaechen": row.get("schwaechen", []),
"empfehlung": row.get("empfehlung"),
"themen": row.get("themen", []),
"antrag_zusammenfassung": row.get("antrag_zusammenfassung"),
"antrag_kernpunkte": row.get("antrag_kernpunkte", []),
"model": row.get("model"),
"updated_at": row.get("updated_at"),
})
payload = {
"meta": {
"name": "GWÖ-Antragsprüfer Open Data",
"description": "Automatische Gemeinwohl-Bilanzierung von Parlamentsanträgen nach der GWÖ-Matrix 2.0",
"url": "https://gwoe.toppyr.de",
"license": "CC BY 4.0",
"license_url": "https://creativecommons.org/licenses/by/4.0/",
"exported_at": datetime.utcnow().isoformat() + "Z",
"count": len(assessments),
},
"assessments": assessments,
}
import json as _json
content = _json.dumps(payload, ensure_ascii=False, indent=2)
return Response(
content=content,
media_type="application/json",
headers={
"Content-Disposition": 'attachment; filename="gwoe-assessments.json"',
"Access-Control-Allow-Origin": "*",
},
)
@app.get("/api/clusters")
async def clusters_api(bundesland: Optional[str] = None, threshold: Optional[float] = None):
"""Antrag-Cluster (#105) per Cosine-Similarity über v4-Embeddings.
Wenn threshold nicht angegeben, nutze den Default aus clustering.py
(0.55, empirisch kalibriert für die aktuelle Prod-DB).
"""
from .clustering import build_hierarchy, DEFAULT_THRESHOLD
return await build_hierarchy(
bundesland=bundesland,
threshold=threshold if threshold is not None else DEFAULT_THRESHOLD,
)
@app.get("/api/assessment/similar")
async def assessment_similar(drucksache: str, top_k: int = 5):
"""Ähnliche Anträge zum gegebenen (#108 Teil B)."""
from .clustering import find_similar_assessments
return await find_similar_assessments(drucksache=drucksache, top_k=min(max(1, top_k), 20))
@app.get("/api/feed.xml")
async def feed_xml(request: Request, bundesland: Optional[str] = None, partei: Optional[str] = None, limit: int = 50):
"""Atom 1.0 Feed der neuesten Bewertungen (Issue #125).
Query-Parameter:
- bundesland: optionaler BL-Code (NRW, MV, BE, …)
- partei: optionaler Partei-Filter (CDU, SPD, GRÜNE, …) — matcht gegen fraktionen-Liste
- limit: Anzahl Einträge (default 50, max 200)
"""
from datetime import datetime, timezone
import hashlib
import html
limit = min(max(1, limit), 200)
rows = await get_all_assessments(bundesland if bundesland else None)
if partei:
partei_norm = partei.upper()
rows = [r for r in rows if any(partei_norm in (f or "").upper() for f in (r.get("fraktionen") or []))]
rows.sort(key=lambda r: r.get("updated_at") or "", reverse=True)
rows = rows[:limit]
base_url = "https://gwoe.toppyr.de"
feed_id_parts = ["gwoe-feed"]
if bundesland:
feed_id_parts.append(bundesland)
if partei:
feed_id_parts.append(partei)
feed_id = "urn:" + ":".join(feed_id_parts)
title_parts = ["GWÖ-Antragsprüfer — Neue Bewertungen"]
if bundesland:
title_parts.append(bundesland)
if partei:
title_parts.append(partei)
feed_title = " · ".join(title_parts)
latest_updated = rows[0].get("updated_at") if rows else datetime.utcnow().isoformat() + "Z"
if latest_updated and not latest_updated.endswith("Z"):
latest_updated = latest_updated + "Z"
etag_src = f"{len(rows)}:{latest_updated}:{bundesland}:{partei}"
etag = '"' + hashlib.md5(etag_src.encode()).hexdigest() + '"'
if request.headers.get("if-none-match") == etag:
return Response(status_code=304)
def _entry(row):
drucksache = row.get("drucksache") or ""
title = html.escape(row.get("title") or drucksache)
score = row.get("gwoe_score")
empfehlung = row.get("empfehlung") or ""
zusammenfassung = row.get("antrag_zusammenfassung") or ""
fraktionen = ", ".join(row.get("fraktionen") or [])
bl = row.get("bundesland") or ""
updated = row.get("updated_at") or ""
if updated and not updated.endswith("Z"):
updated += "Z"
detail_url = f"{base_url}/?drucksache={drucksache}"
entry_id = f"urn:gwoe:{bl}:{drucksache}"
summary_parts = []
if score is not None:
summary_parts.append(f"GWÖ-Score: {score}/10")
if empfehlung:
summary_parts.append(f"Empfehlung: {empfehlung}")
if fraktionen:
summary_parts.append(f"Fraktionen: {fraktionen}")
if zusammenfassung:
summary_parts.append(zusammenfassung)
summary = html.escape("".join(summary_parts))
return f""" <entry>
<id>{html.escape(entry_id)}</id>
<title>{title}</title>
<link href="{html.escape(detail_url)}"/>
<updated>{html.escape(updated)}</updated>
<summary>{summary}</summary>
<category term="{html.escape(bl)}"/>
</entry>"""
entries_xml = "\n".join(_entry(r) for r in rows)
self_url = f"{base_url}/api/feed.xml"
if bundesland or partei:
params = []
if bundesland:
params.append(f"bundesland={bundesland}")
if partei:
params.append(f"partei={partei}")
self_url += "?" + "&".join(params)
xml = f"""<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<id>{html.escape(feed_id)}</id>
<title>{html.escape(feed_title)}</title>
<subtitle>Automatische Gemeinwohl-Bilanzierung von Parlamentsanträgen</subtitle>
<link href="{html.escape(base_url)}" rel="alternate" type="text/html"/>
<link href="{html.escape(self_url)}" rel="self" type="application/atom+xml"/>
<updated>{html.escape(latest_updated)}</updated>
<author><name>GWÖ-Antragsprüfer</name></author>
<rights>CC BY 4.0</rights>
{entries_xml}
</feed>
"""
return Response(
content=xml,
media_type="application/atom+xml; charset=utf-8",
headers={
"ETag": etag,
"Cache-Control": "public, max-age=600",
"Access-Control-Allow-Origin": "*",
},
)
@app.get("/api/auswertungen/export.csv")
async def auswertungen_export_csv():
"""Long-Format-CSV-Export aller Assessments. Deckt #45 mit ab."""
from .auswertungen import export_long_format
csv_text = export_long_format()
return Response(
content=csv_text,
media_type="text/csv",
headers={"Content-Disposition": 'attachment; filename="gwoe-assessments.csv"'},
)
# ─── v2 Frontend (#139 Phase 2 + Phase 3) ───────────────────────────────────
# / ist jetzt Default-v2. /v2 leitet auf / weiter; /v2/antrag/* auf /antrag/*.
# /classic ist die alte Ansicht (index.html unverändert).
# Phase 3: /v2/merkliste, /v2/tags, /v2/cluster, /v2/neu, /v2/batch
@app.get("/v2", response_class=HTMLResponse)
async def v2_redirect(request: Request):
"""Redirect /v2 → / (v2 ist jetzt Default unter /)."""
from fastapi.responses import RedirectResponse
return RedirectResponse("/", status_code=301)
@app.get("/v2/antrag/{drucksache:path}", response_class=HTMLResponse)
async def v2_antrag_redirect(request: Request, drucksache: str):
"""Redirect /v2/antrag/XX → /antrag/XX (kanonische URL)."""
from fastapi.responses import RedirectResponse
return RedirectResponse(f"/antrag/{drucksache}", status_code=301)
@app.get("/v2/merkliste", response_class=HTMLResponse)
async def v2_merkliste(request: Request, current_user: dict = Depends(require_auth)):
"""Merkliste (Bookmarks) — nur für eingeloggte User; lädt Daten via /api/bookmarks client-seitig."""
return templates.TemplateResponse("v2/screens/merkliste.html", {
"request": request,
"v2_active_nav": "merkliste",
**_v2_template_context(current_user),
})
@app.get("/v2/tags", response_class=HTMLResponse)
async def v2_tags(request: Request, current_user: Optional[dict] = Depends(get_current_user)):
"""Tag-Cloud-Seite — Themen-Filter über alle Assessments."""
return templates.TemplateResponse("v2/screens/tags.html", {
"request": request,
"v2_active_nav": "tags",
**_v2_template_context(current_user),
})
@app.get("/v2/abos", response_class=HTMLResponse)
async def v2_abos(request: Request, current_user: dict = Depends(require_auth)):
"""Eigene E-Mail-Abos verwalten — auth-only."""
from .parteien import all_canonical_keys
# Landesregierung als Filter unsinnig — ausblenden
parteien = [p for p in all_canonical_keys() if p != "Landesregierung"]
return templates.TemplateResponse("v2/screens/abos.html", {
"request": request,
"v2_active_nav": "abos",
"parteien": parteien,
**_v2_template_context(current_user),
})
@app.get("/v2/feed", response_class=HTMLResponse)
async def v2_feed(request: Request, current_user: dict = Depends(require_auth)):
"""Atom-Feed-Konfigurations-Seite — auth-only."""
from .parteien import all_canonical_keys
parteien = [p for p in all_canonical_keys() if p != "Landesregierung"]
return templates.TemplateResponse("v2/screens/feed.html", {
"request": request,
"v2_active_nav": "feed",
"parteien": parteien,
**_v2_template_context(current_user),
})
@app.get("/v2/cluster", response_class=HTMLResponse)
async def v2_cluster(request: Request, current_user: dict = Depends(require_admin)):
"""Cluster-Liste — nur für Admins."""
rows = await get_all_assessments(None)
assessments = _rows_to_list(rows)
bl_codes = sorted({a["bundesland"] for a in assessments if a.get("bundesland")})
return templates.TemplateResponse("v2/screens/cluster.html", {
"request": request,
"v2_active_nav": "cluster",
"bl_codes": bl_codes,
**_v2_template_context(current_user),
})
@app.get("/v2/neu", response_class=HTMLResponse)
async def v2_neu(request: Request, current_user: dict = Depends(require_auth)):
"""Neuer-Antrag-Form — nur für eingeloggte User; startet Analyse via /api/analyze-drucksache."""
from .bundeslaender import alle_bundeslaender
bl_list = [
{"code": bl.code, "name": bl.name}
for bl in alle_bundeslaender()
if bl.aktiv
]
return templates.TemplateResponse("v2/screens/neu.html", {
"request": request,
"v2_active_nav": "neu",
"bundeslaender": sorted(bl_list, key=lambda x: x["name"]),
"default_model": settings.llm_model_default,
**_v2_template_context(current_user),
})
@app.get("/v2/landtag-suche", response_class=HTMLResponse)
async def v2_landtag_suche(request: Request, current_user: dict = Depends(require_auth)):
"""Landtag-Suche — nur für eingeloggte User; sucht Drucksachen live im Landtags-Portal."""
from .bundeslaender import alle_bundeslaender
bl_list = [
{"code": bl.code, "name": bl.name}
for bl in alle_bundeslaender()
if bl.aktiv
]
return templates.TemplateResponse("v2/screens/landtag_suche.html", {
"request": request,
"v2_active_nav": "landtag_suche",
"bundeslaender": sorted(bl_list, key=lambda x: x["name"]),
**_v2_template_context(current_user),
})
@app.get("/v2/batch", response_class=HTMLResponse)
async def v2_batch(request: Request, current_user: dict = Depends(require_admin)):
"""Batch-Analyse-Form — nur für Admins; enqueued ungeprüfte Drucksachen eines BL."""
from .bundeslaender import alle_bundeslaender
bl_list = [
{"code": bl.code, "name": bl.name}
for bl in alle_bundeslaender()
if bl.aktiv
]
return templates.TemplateResponse("v2/screens/batch.html", {
"request": request,
"v2_active_nav": "batch",
"bundeslaender": sorted(bl_list, key=lambda x: x["name"]),
**_v2_template_context(current_user),
})
# ─── v2 Admin-Screens ────────────────────────────────────────────────────────
@app.get("/v2/admin/freischaltungen", response_class=HTMLResponse)
async def v2_admin_freischaltungen(request: Request, user: dict = Depends(require_admin)):
"""Ausstehende User-Freischaltungen (Admin)."""
return templates.TemplateResponse("v2/screens/admin_freischaltungen.html", {
"request": request,
"v2_active_nav": "admin_freischaltungen",
**_v2_template_context(user),
})
@app.get("/v2/admin/queue", response_class=HTMLResponse)
async def v2_admin_queue(request: Request, user: dict = Depends(require_admin)):
"""Queue-Status-Übersicht (Admin)."""
return templates.TemplateResponse("v2/screens/admin_queue.html", {
"request": request,
"v2_active_nav": "admin_queue",
**_v2_template_context(user),
})
@app.get("/v2/admin/abos", response_class=HTMLResponse)
async def v2_admin_abos(request: Request, user: dict = Depends(require_admin)):
"""Abo-Verwaltung — alle E-Mail-Abonnements (Admin)."""
return templates.TemplateResponse("v2/screens/admin_abos.html", {
"request": request,
"v2_active_nav": "admin_abos",
**_v2_template_context(user),
})
# ─── #141 Open-Graph-Karten ──────────────────────────────────────────────────
@app.get("/v2/og-template", response_class=HTMLResponse)
async def og_template(request: Request, drucksache: str = ""):
"""Internes Render-Template für Playwright (#141).
Wird von render_og_card() intern aufgerufen — nicht für Endnutzer.
"""
antrag = None
if drucksache:
try:
drucksache = validate_drucksache(drucksache)
row = await get_assessment(drucksache)
if row:
antrag = _row_to_detail(row)
except Exception:
pass
return templates.TemplateResponse("v2/og_template.html", {
"request": request,
"antrag": antrag,
})
@app.get("/api/og/{drucksache_encoded}.png")
async def api_og_card(drucksache_encoded: str, request: Request):
"""Liefert die Open-Graph-PNG-Karte für einen Antrag (#141).
Cache-Hit → direkte Datei-Response.
Cache-Miss → Playwright-Render (synchron, blockiert kurz).
"""
import urllib.parse
from .og_card import render_og_card, get_cached
from .config import settings
drucksache = urllib.parse.unquote(drucksache_encoded)
try:
drucksache = validate_drucksache(drucksache)
except Exception:
raise HTTPException(status_code=400, detail="Ungültige Drucksachen-ID")
row = await get_assessment(drucksache)
if not row:
raise HTTPException(status_code=404, detail="Antrag nicht gefunden")
updated_at = row.get("updated_at", "")
og_cache_dir = settings.data_dir / "og-cache"
cached_path = get_cached(drucksache, updated_at, og_cache_dir)
if cached_path:
return FileResponse(str(cached_path), media_type="image/png")
# Interne URL: der laufende Server selbst (Playwright greift loopback an)
base_url = f"http://127.0.0.1:{settings.port}"
png_bytes = render_og_card(drucksache, updated_at, base_url, og_cache_dir)
if not png_bytes:
raise HTTPException(status_code=500, detail="OG-Render fehlgeschlagen")
return Response(content=png_bytes, media_type="image/png")
# ─── #138 Admin: Wahlprogramm-Beschaffung ────────────────────────────────────
@app.get("/v2/admin/wahlprogramme", response_class=HTMLResponse)
async def v2_admin_wahlprogramme(request: Request, user: dict = Depends(require_admin)):
"""Admin-Übersicht fehlender Wahlprogramme mit Kandidaten-URLs (#138)."""
from .wahlprogramm_fetch import get_missing_programmes
missing = get_missing_programmes()
return templates.TemplateResponse("v2/screens/admin_wahlprogramme.html", {
"request": request,
"v2_active_nav": "admin_wahlprogramme",
"is_admin": True,
"missing": missing,
})
@app.post("/api/admin/wahlprogramm-fetch")
async def api_admin_wahlprogramm_fetch(
request: Request,
user: dict = Depends(require_admin),
):
"""Lädt ein Wahlprogramm von einer angegebenen URL (#138).
Body: JSON { "bl": "NRW", "partei": "BSW", "url": "https://..." }
SHA-Gate: Wenn eine Datei bereits vorhanden ist und der SHA abweicht,
wird sie nicht überschrieben — stattdessen wird ein 409-Fehler zurückgegeben.
"""
from .wahlprogramm_fetch import fetch_and_verify, suggest_candidates
from .wahlprogramme import WAHLPROGRAMME
body = await request.json()
bl = body.get("bl", "").strip().upper()
partei = body.get("partei", "").strip().upper()
url = body.get("url", "").strip()
expected_sha = body.get("expected_sha", None)
if not bl or not partei:
raise HTTPException(status_code=400, detail="bl und partei sind Pflichtfelder")
if not url:
candidates = suggest_candidates(bl, partei)
if not candidates:
raise HTTPException(
status_code=400,
detail=f"Keine URL angegeben und keine Kandidaten für {bl}/{partei} hinterlegt",
)
url = candidates[0]["url"]
wp_info = WAHLPROGRAMME.get(bl, {}).get(partei)
if wp_info:
from pathlib import Path as _Path
dest = _Path(__file__).parent / "static" / "referenzen" / wp_info["file"]
else:
from pathlib import Path as _Path
dest = _Path(__file__).parent / "static" / "referenzen" / f"{partei.lower()}-{bl.lower()}-neu.pdf"
result = fetch_and_verify(url, dest, expected_sha)
if not result["ok"]:
# SHA-Abweichung oder Download-Fehler → 409 damit kein stilles Überschreiben
status = 409 if "SHA" in (result["error"] or "") else 502
raise HTTPException(status_code=status, detail=result["error"])
return JSONResponse({
"ok": True,
"sha256": result["sha256"],
"prev_sha256": result["prev_sha256"],
"changed": result["changed"],
"dest": str(dest.name),
})
# ─── Feedback / Bug-Report — Gitea-Issue-Anbindung ───────────────────────────
def _strip_html(text: str, max_len: int) -> str:
"""Minimale HTML-Tag-Entfernung + Längenbegrenzung für Nutzerinput."""
import re
cleaned = re.sub(r'<[^>]+>', '', text)
return cleaned[:max_len]
async def _gitea_ensure_label(session, base_url: str, owner: str, repo: str,
token: str, label_name: str, color: str = "#e11d48") -> int | None:
"""Gibt die ID des Labels zurück; legt es idempotent an, falls es fehlt."""
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
url = f"{base_url}/repos/{owner}/{repo}/labels"
try:
r = await session.get(url, headers=headers)
if r.status_code == 200:
for lbl in r.json():
if lbl.get("name") == label_name:
return lbl["id"]
# Label fehlt → anlegen
r2 = await session.post(url, headers=headers,
json={"name": label_name, "color": color})
if r2.status_code in (200, 201):
return r2.json().get("id")
except Exception as exc:
logger.exception("Gitea-Label-Lookup fehlgeschlagen: %s", exc)
return None
async def _gitea_upload_screenshot(session, base_url: str, owner: str, repo: str,
token: str, issue_index: int,
data_uri: str) -> str | None:
"""Lädt einen Screenshot als Issue-Asset hoch. Gibt Attachment-URL zurück oder None."""
import base64, re as _re
m = _re.match(r'data:(image/[a-z]+);base64,(.+)', data_uri, _re.DOTALL)
if not m:
return None
mime, b64data = m.group(1), m.group(2)
try:
raw = base64.b64decode(b64data)
except Exception:
return None
ext = mime.split('/')[-1]
upload_url = f"{base_url}/repos/{owner}/{repo}/issues/{issue_index}/assets"
headers = {"Authorization": f"token {token}"}
files = {"attachment": (f"screenshot.{ext}", raw, mime)}
try:
r = await session.post(upload_url, headers=headers, files=files)
if r.status_code in (200, 201):
return r.json().get("browser_download_url") or r.json().get("download_url")
except Exception as exc:
logger.exception("Screenshot-Upload fehlgeschlagen: %s", exc)
return None
@app.post("/api/feedback")
@limiter.limit("5/hour")
async def submit_feedback(
request: Request,
titel: str = Form(...),
beschreibung: str = Form(...),
url: str = Form(""),
user_agent: str = Form(""),
viewport: str = Form(""),
drucksache: str = Form(""),
klicks_json: str = Form("[]"),
errors_json: str = Form("[]"),
screenshot: Optional[str] = Form(None),
screenshot_error: Optional[str] = Form(None),
current_user: Optional[dict] = Depends(get_current_user),
):
"""Erstellt ein Gitea-Issue mit Label 'feedback'.
Audit-Trail (Klicks, Errors, URL etc.) wird im Issue-Body als
Markdown-Code-Block angefügt. Screenshot wird als Issue-Asset
hochgeladen, falls vorhanden.
"""
import json as _json
import httpx
# Validierung
titel_clean = _strip_html(titel, 200).strip()
beschreibung_clean = _strip_html(beschreibung, 5000).strip()
if not titel_clean:
raise HTTPException(status_code=400, detail="Titel darf nicht leer sein")
if not beschreibung_clean:
raise HTTPException(status_code=400, detail="Beschreibung darf nicht leer sein")
# Audit-Trail parsen
try:
klicks = _json.loads(klicks_json)[:15]
except Exception:
klicks = []
try:
errors = _json.loads(errors_json)[:10]
except Exception:
errors = []
# User-Identität (wenn eingeloggt)
user_email = ""
user_name = ""
if current_user:
user_email = current_user.get("email", "")
user_name = current_user.get("preferred_username", current_user.get("name", ""))
# Issue-Body zusammenbauen
body_parts = [beschreibung_clean, ""]
body_parts.append("## Kontext")
body_parts.append(f"- **URL:** `{url[:300]}`")
if drucksache:
body_parts.append(f"- **Drucksache:** `{drucksache[:100]}`")
body_parts.append(f"- **Viewport:** {viewport}")
body_parts.append(f"- **User-Agent:** `{user_agent[:200]}`")
if user_name:
body_parts.append(f"- **Gemeldet von:** {user_name} ({user_email})")
else:
body_parts.append("- **Gemeldet von:** anonym")
body_parts.append("")
if klicks:
body_parts.append("## Letzte Klicks (Audit-Trail)")
body_parts.append("```")
for c in klicks:
txt_part = f' "{c["txt"]}"' if c.get("txt") else ""
body_parts.append(f'{c.get("t","")[-8:]} {c.get("el","")}{txt_part}')
body_parts.append("```")
body_parts.append("")
if errors:
body_parts.append("## Console-Errors")
body_parts.append("```")
for err in errors:
body_parts.append(f'{err.get("t","")[-8:]} {err.get("msg","")} @ {err.get("src","")}')
body_parts.append("```")
body_parts.append("")
if screenshot_error:
body_parts.append(f"_Screenshot angefordert, aber fehlgeschlagen: `{screenshot_error[:200]}`_")
body_parts.append("")
issue_body = "\n".join(body_parts)
if not settings.gitea_token:
logger.warning("GITEA_TOKEN nicht gesetzt — Feedback-Issue kann nicht angelegt werden")
raise HTTPException(
status_code=503,
detail="Feedback-Funktion ist derzeit nicht konfiguriert (kein Gitea-Token)."
)
base_url = settings.gitea_api_url
owner = settings.gitea_repo_owner
repo = settings.gitea_repo_name
token = settings.gitea_token
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
async with httpx.AsyncClient(timeout=15.0) as session:
# Label sicherstellen
label_id = await _gitea_ensure_label(session, base_url, owner, repo, token, "feedback")
label_ids = [label_id] if label_id else []
# Issue anlegen
payload = {
"title": titel_clean,
"body": issue_body,
"label_ids": label_ids,
}
try:
r = await session.post(
f"{base_url}/repos/{owner}/{repo}/issues",
headers=headers,
json=payload,
)
except httpx.RequestError as exc:
logger.exception("Gitea-Request fehlgeschlagen: %s", exc)
raise HTTPException(status_code=502, detail="Gitea nicht erreichbar")
if r.status_code not in (200, 201):
logger.error("Gitea-Issue-Anlage fehlgeschlagen: %s %s", r.status_code, r.text[:500])
raise HTTPException(status_code=502, detail=f"Gitea: {r.status_code}")
issue = r.json()
issue_index = issue.get("number") or issue.get("id")
issue_url = issue.get("html_url", "")
# Screenshot hochladen (optional)
if screenshot and issue_index:
att_url = await _gitea_upload_screenshot(
session, base_url, owner, repo, token, issue_index, screenshot
)
if att_url:
# Screenshot-Link als Kommentar anhängen
comment_headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
try:
await session.post(
f"{base_url}/repos/{owner}/{repo}/issues/{issue_index}/comments",
headers=comment_headers,
json={"body": f"**Screenshot:**\n\n![screenshot]({att_url})"},
)
except Exception as exc:
logger.exception("Screenshot-Kommentar fehlgeschlagen: %s", exc)
logger.info("Feedback-Issue #%s angelegt: %s", issue_index, issue_url)
return JSONResponse({"issue_id": issue_index, "issue_url": issue_url})
# Health check
@app.get("/health")
async def health():
return {"status": "ok", "version": settings.app_version}