gwoe-antragspruefer/app/main.py
Dotty Dotter ac18743ff2 Add central bundeslaender.py module with all 16 states (#7)
Introduces app/bundeslaender.py as the single source of truth for all
bundesland-specific data (parliament name, current legislative period,
upcoming elections, governing coalition, doku system, base URLs,
drucksache format, dokukratie scraper code, active flag, optional
remarks). Data reflects April 2026 state.

main.py::index() and /api/bundeslaender now derive their lists from
this module instead of hardcoding. Frontend dropdown now shows all 16
bundesländer (15 disabled with "(bald)" suffix); previously the
landing template showed only 4. NRW remains the only "aktiv" entry.

API behaviour change worth noting: the /api/bundeslaender endpoint
previously emitted code "ST" for Sachsen-Anhalt; it now emits "LSA"
to match the politically dominant abbreviation. No functional impact
because non-NRW bundesländer were inactive in both versions.

Foundation for #5 and #2; deliberately a no-op for NRW so it can ship
and rollback independently.

Resolves issue #7.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 14:17:54 +02:00

561 lines
19 KiB
Python

"""GWÖ-Antragsprüfer — FastAPI Webapp."""
import uuid
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, File, Form, UploadFile, Request, BackgroundTasks, HTTPException
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, Response
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from .config import settings
from .database import (
init_db, get_job, create_job, update_job,
get_all_assessments, get_assessment, upsert_assessment, import_json_assessments,
search_assessments
)
from .parlamente import get_adapter, ADAPTERS
from .bundeslaender import alle_bundeslaender
from .analyzer import analyze_antrag
from .report import generate_html_report, generate_pdf_report
from .embeddings import (
init_embeddings_db, get_programme_info, get_indexing_status,
index_programm, PROGRAMME
)
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
docs_url=None, # Disable /docs in production
redoc_url=None, # Disable /redoc in production
openapi_url=None, # Disable /openapi.json in production
)
# Security Headers Middleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
# CSP: Allow self, inline styles (for templates), and PDF viewer
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"script-src 'self' 'unsafe-inline'; "
"img-src 'self' data:; "
"frame-ancestors 'none';"
)
return response
app.add_middleware(SecurityHeadersMiddleware)
# Setup directories
settings.data_dir.mkdir(exist_ok=True)
settings.reports_dir.mkdir(exist_ok=True)
# Static files and templates
static_dir = Path(__file__).parent / "static"
templates_dir = Path(__file__).parent / "templates"
static_dir.mkdir(exist_ok=True)
templates_dir.mkdir(exist_ok=True)
app.mount("/static", StaticFiles(directory=static_dir), name="static")
templates = Jinja2Templates(directory=str(templates_dir))
@app.on_event("startup")
async def startup():
await init_db()
init_embeddings_db()
# JSON import disabled - all assessments now live in SQLite DB only
# Legacy import would overwrite new v5 assessments with old format
# count = await import_json_assessments(settings.data_dir / "assessments")
# if count > 0:
# print(f"Imported {count} assessments from JSON files")
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
"""Landing page with upload form."""
return templates.TemplateResponse("index.html", {
"request": request,
"app_name": settings.app_name,
"bundeslaender": [
{"code": bl.code, "name": bl.name, "active": bl.aktiv}
for bl in alle_bundeslaender()
],
})
@app.post("/analyze")
async def start_analysis(
background_tasks: BackgroundTasks,
text: Optional[str] = Form(None),
file: Optional[UploadFile] = File(None),
bundesland: str = Form("NRW"),
model: str = Form("qwen-plus"),
):
"""Start analysis job."""
if not text and not file:
raise HTTPException(status_code=400, detail="Entweder Text oder PDF-Datei erforderlich")
# Extract text from PDF if uploaded
if file and file.filename:
import fitz # PyMuPDF
pdf_bytes = await file.read()
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
text = ""
for page in doc:
text += page.get_text()
doc.close()
# Create job
job_id = str(uuid.uuid4())
await create_job(job_id, text[:500], bundesland, model)
# Start background analysis
background_tasks.add_task(run_analysis, job_id, text, bundesland, model)
return JSONResponse({"job_id": job_id, "status": "queued"})
async def run_analysis(job_id: str, text: str, bundesland: str, model: str):
"""Background task for analysis."""
try:
await update_job(job_id, status="processing")
# Run LLM analysis
assessment = await analyze_antrag(text, bundesland, model)
# Generate reports
html_path = settings.reports_dir / f"{job_id}.html"
pdf_path = settings.reports_dir / f"{job_id}.pdf"
await generate_html_report(assessment, html_path)
await generate_pdf_report(assessment, pdf_path)
await update_job(
job_id,
status="completed",
result=assessment.model_dump_json(),
html_path=str(html_path),
pdf_path=str(pdf_path),
)
except Exception as e:
await update_job(job_id, status="failed", error=str(e))
@app.get("/status/{job_id}")
async def get_status(job_id: str):
"""Get job status."""
job = await get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job nicht gefunden")
return JSONResponse({
"job_id": job_id,
"status": job["status"],
"created_at": job["created_at"],
})
@app.get("/result/{job_id}", response_class=HTMLResponse)
async def get_result(request: Request, job_id: str):
"""Get analysis result as HTML."""
job = await get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job nicht gefunden")
if job["status"] != "completed":
raise HTTPException(status_code=400, detail=f"Job noch nicht fertig: {job['status']}")
html_path = Path(job["html_path"])
if html_path.exists():
return HTMLResponse(html_path.read_text())
raise HTTPException(status_code=500, detail="Report nicht gefunden")
@app.get("/result/{job_id}/pdf")
async def get_pdf(job_id: str):
"""Download PDF report."""
job = await get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job nicht gefunden")
if job["status"] != "completed":
raise HTTPException(status_code=400, detail=f"Job noch nicht fertig: {job['status']}")
pdf_path = Path(job["pdf_path"])
if pdf_path.exists():
return FileResponse(
pdf_path,
media_type="application/pdf",
filename=f"gwoe-bericht-{job_id[:8]}.pdf"
)
raise HTTPException(status_code=500, detail="PDF nicht gefunden")
# API: Load assessments from database
@app.get("/api/assessments")
async def list_assessments():
"""Return all assessments from database."""
rows = await get_all_assessments()
# Convert DB format to frontend format
assessments = []
for row in rows:
assessments.append({
"drucksache": row.get("drucksache"),
"title": row.get("title"),
"fraktionen": row.get("fraktionen", []),
"datum": row.get("datum"),
"link": row.get("link"),
"gwoeScore": row.get("gwoe_score"),
"gwoeBegründung": row.get("gwoe_begruendung"),
"gwoeMatrix": row.get("gwoe_matrix", []),
"gwoeSchwerpunkt": row.get("gwoe_schwerpunkt", []),
"wahlprogrammScores": row.get("wahlprogramm_scores", []),
"verbesserungen": row.get("verbesserungen", []),
"stärken": row.get("staerken", []),
"schwächen": row.get("schwaechen", []),
"empfehlung": row.get("empfehlung"),
"empfehlungSymbol": row.get("empfehlung_symbol"),
"verbesserungspotenzial": row.get("verbesserungspotenzial"),
"themen": row.get("themen", []),
"antragZusammenfassung": row.get("antrag_zusammenfassung"),
"antragKernpunkte": row.get("antrag_kernpunkte", []),
})
return assessments
# API: Get single assessment (use query param for drucksache with /)
@app.get("/api/assessment")
async def get_single_assessment(drucksache: str):
"""Get a single assessment by drucksache ID."""
row = await get_assessment(drucksache)
if not row:
raise HTTPException(status_code=404, detail="Assessment nicht gefunden")
return {
"drucksache": row.get("drucksache"),
"title": row.get("title"),
"fraktionen": row.get("fraktionen", []),
"datum": row.get("datum"),
"link": row.get("link"),
"gwoeScore": row.get("gwoe_score"),
"gwoeBegründung": row.get("gwoe_begruendung"),
"gwoeMatrix": row.get("gwoe_matrix", []),
"gwoeSchwerpunkt": row.get("gwoe_schwerpunkt", []),
"wahlprogrammScores": row.get("wahlprogramm_scores", []),
"verbesserungen": row.get("verbesserungen", []),
"stärken": row.get("staerken", []),
"schwächen": row.get("schwaechen", []),
"empfehlung": row.get("empfehlung"),
"empfehlungSymbol": row.get("empfehlung_symbol"),
"verbesserungspotenzial": row.get("verbesserungspotenzial"),
"themen": row.get("themen", []),
"antragZusammenfassung": row.get("antrag_zusammenfassung"),
"antragKernpunkte": row.get("antrag_kernpunkte", []),
}
# API: Generate PDF on demand for an assessment
@app.get("/api/assessment/pdf")
async def download_assessment_pdf(drucksache: str):
"""Generate and download PDF for an assessment."""
from .models import Assessment
row = await get_assessment(drucksache)
if not row:
raise HTTPException(status_code=404, detail="Assessment nicht gefunden")
# Check if PDF already exists
safe_name = drucksache.replace("/", "-")
pdf_path = settings.reports_dir / f"{safe_name}.pdf"
if not pdf_path.exists():
# Convert DB row to Assessment model for report generation
assessment_data = {
"drucksache": row.get("drucksache"),
"title": row.get("title"),
"fraktionen": row.get("fraktionen", []),
"datum": row.get("datum"),
"link": row.get("link"),
"gwoe_score": row.get("gwoe_score") or 0,
"gwoe_begruendung": row.get("gwoe_begruendung") or "",
"gwoe_matrix": row.get("gwoe_matrix", []),
"gwoe_schwerpunkt": row.get("gwoe_schwerpunkt", []),
"wahlprogramm_scores": row.get("wahlprogramm_scores", []),
"verbesserungen": row.get("verbesserungen", []),
"staerken": row.get("staerken", []),
"schwaechen": row.get("schwaechen", []),
"empfehlung": row.get("empfehlung") or "",
"empfehlung_symbol": row.get("empfehlung_symbol") or "",
"verbesserungspotenzial": row.get("verbesserungspotenzial") or "",
"themen": row.get("themen", []),
"antrag_zusammenfassung": row.get("antrag_zusammenfassung") or "",
"antrag_kernpunkte": row.get("antrag_kernpunkte", []),
}
try:
assessment = Assessment(**assessment_data)
await generate_pdf_report(assessment, pdf_path)
except Exception as e:
raise HTTPException(status_code=500, detail=f"PDF-Generierung fehlgeschlagen: {e}")
return FileResponse(
pdf_path,
media_type="application/pdf",
filename=f"gwoe-{safe_name}.pdf"
)
# API: Search internal DB only
@app.get("/api/search")
async def search_internal(
q: str,
bundesland: str = "NRW",
limit: int = 50
):
"""
Search internal assessments database only.
"""
db_results = await search_assessments(q, bundesland, limit)
results = []
for row in db_results:
results.append({
"drucksache": row.get("drucksache"),
"title": row.get("title"),
"fraktionen": row.get("fraktionen", []),
"datum": row.get("datum"),
"link": row.get("link"),
"bundesland": bundesland,
"gwoeScore": row.get("gwoe_score"),
"themen": row.get("themen", []),
"status": "checked",
})
return results
# API: Search external parliament portal (Landtag)
@app.get("/api/search-landtag")
async def search_landtag(
q: str,
bundesland: str = "NRW",
limit: int = 20
):
"""
Search external parliament portal (e.g., NRW OPAL).
Returns results that can be analyzed with "Jetzt prüfen".
"""
adapter = get_adapter(bundesland)
if not adapter:
return {"error": f"Bundesland {bundesland} noch nicht unterstützt"}
try:
external = await adapter.search(q, limit)
results = []
for doc in external:
results.append({
"drucksache": doc.drucksache,
"title": doc.title,
"fraktionen": doc.fraktionen,
"datum": doc.datum,
"link": doc.link,
"bundesland": bundesland,
"typ": doc.typ,
"gwoeScore": None,
"status": "unchecked",
})
return results
except Exception as e:
print(f"Landtag search error: {e}")
return {"error": f"Suchfehler: {str(e)}"}
# API: Analyze a document from parliament portal
@app.post("/api/analyze-drucksache")
async def analyze_drucksache(
background_tasks: BackgroundTasks,
drucksache: str = Form(...),
bundesland: str = Form("NRW"),
model: str = Form("qwen-plus")
):
"""
Download a document from parliament portal and analyze it.
"""
# Check if already analyzed
existing = await get_assessment(drucksache)
if existing:
return {"status": "already_checked", "drucksache": drucksache}
# Get adapter and download
adapter = get_adapter(bundesland)
if not adapter:
raise HTTPException(status_code=400, detail=f"Bundesland {bundesland} nicht unterstützt")
# Download text
text = await adapter.download_text(drucksache)
if not text:
raise HTTPException(status_code=404, detail=f"Dokument {drucksache} nicht gefunden")
# Get document metadata
doc = await adapter.get_document(drucksache)
# Create job
job_id = str(uuid.uuid4())
await create_job(job_id, text[:500], bundesland, model)
# Start background analysis
background_tasks.add_task(
run_drucksache_analysis,
job_id, drucksache, text, bundesland, model, doc
)
return {"status": "queued", "job_id": job_id, "drucksache": drucksache}
async def run_drucksache_analysis(
job_id: str,
drucksache: str,
text: str,
bundesland: str,
model: str,
doc
):
"""Background task for drucksache analysis."""
try:
await update_job(job_id, status="processing")
# Run LLM analysis
assessment = await analyze_antrag(text, bundesland, model)
# Prepare data for DB
assessment_data = {
"drucksache": drucksache,
"title": assessment.title or (doc.title if doc else f"Drucksache {drucksache}"),
"fraktionen": assessment.fraktionen,
"datum": assessment.datum or (doc.datum if doc else ""),
"link": doc.link if doc else "",
"bundesland": bundesland,
"gwoeScore": assessment.gwoe_score,
"gwoeBegründung": assessment.gwoe_begruendung,
"gwoeMatrix": [m.model_dump() for m in assessment.gwoe_matrix],
"gwoeSchwerpunkt": assessment.gwoe_schwerpunkt,
"wahlprogrammScores": [w.model_dump() for w in assessment.wahlprogramm_scores],
"verbesserungen": [v.model_dump() for v in assessment.verbesserungen],
"stärken": assessment.staerken,
"schwächen": assessment.schwaechen,
"empfehlung": assessment.empfehlung,
"empfehlungSymbol": assessment.empfehlung_symbol,
"verbesserungspotenzial": assessment.verbesserungspotenzial,
"themen": assessment.themen,
"antragZusammenfassung": assessment.antrag_zusammenfassung,
"antragKernpunkte": assessment.antrag_kernpunkte,
"source": "webapp",
"model": model,
}
# Save to DB
await upsert_assessment(assessment_data)
# Generate reports
html_path = settings.reports_dir / f"{job_id}.html"
pdf_path = settings.reports_dir / f"{job_id}.pdf"
await generate_html_report(assessment, html_path)
await generate_pdf_report(assessment, pdf_path)
await update_job(
job_id,
status="completed",
result=assessment.model_dump_json(),
html_path=str(html_path),
pdf_path=str(pdf_path),
)
except Exception as e:
import traceback
print(f"ERROR in run_drucksache_analysis for {drucksache}: {e}")
print(traceback.format_exc())
await update_job(job_id, status="failed", error=str(e))
# API: List available Bundesländer
@app.get("/api/bundeslaender")
async def list_bundeslaender():
"""List available bundesländer with their status."""
return [
{"code": bl.code, "name": bl.name, "active": bl.aktiv}
for bl in alle_bundeslaender()
]
# === Quellen / Programme ===
@app.get("/quellen", response_class=HTMLResponse)
async def quellen_page(request: Request):
"""Quellen-Seite mit allen Wahl- und Parteiprogrammen."""
programmes = get_programme_info()
status = get_indexing_status()
return templates.TemplateResponse("quellen.html", {
"request": request,
"app_name": settings.app_name,
"programmes": programmes,
"status": status,
})
@app.get("/api/programme")
async def list_programme():
"""List all available programmes."""
return get_programme_info()
@app.get("/api/programme/status")
async def programme_status():
"""Get indexing status of all programmes."""
return get_indexing_status()
@app.post("/api/programme/index")
async def index_programme(
background_tasks: BackgroundTasks,
programm_id: str = Form(None),
all_programmes: bool = Form(False),
):
"""Index programme(s) for semantic search."""
pdf_dir = static_dir / "referenzen"
if all_programmes:
# Index sequentially to avoid DB locks
async def index_all_sequential():
for prog_id in PROGRAMME.keys():
try:
index_programm(prog_id, pdf_dir)
except Exception as e:
print(f"Error indexing {prog_id}: {e}")
background_tasks.add_task(index_all_sequential)
return {"status": "indexing", "programmes": list(PROGRAMME.keys())}
if programm_id and programm_id in PROGRAMME:
background_tasks.add_task(index_programm, programm_id, pdf_dir)
return {"status": "indexing", "programm_id": programm_id}
raise HTTPException(status_code=400, detail="Ungültiges Programm")
# Health check
@app.get("/health")
async def health():
return {"status": "ok", "version": settings.app_version}