antragstracker/backend/src/tracker/api/routes/bewertung.py

310 lines
11 KiB
Python
Raw Normal View History

from __future__ import annotations
"""API routes for KI re-evaluation."""
import json
import os
import sqlite3
import threading
from datetime import datetime
import httpx
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from tracker.db.session import get_connection
# Note: Re-evaluation should re-scrape ALLRIS data before KI evaluation
# to exclude transfer errors. See Gitea issue for full spec.
# IMPORTANT: Destructive changes (deleting old bewertungen etc.) only after
# explicit user request with explanation. Keep old data as backup where possible.
router = APIRouter(prefix="/bewertung", tags=["bewertung"])
def _db():
conn = get_connection()
try:
yield conn
finally:
conn.close()
# API config
DASHSCOPE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1/chat/completions"
GEMINI_URL_TEMPLATE = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={key}"
# Job tracking
_jobs: dict[str, dict] = {}
def _get_key(name: str) -> str:
"""Get API key from env or macOS keychain."""
val = os.environ.get(name)
if val:
return val
import subprocess
keychain_map = {"QWEN_API_KEY": "qwen-api", "GEMINI_API_KEY": "gemini-api"}
svc = keychain_map.get(name)
if svc:
try:
return subprocess.check_output(
["security", "find-generic-password", "-s", svc, "-w"],
stderr=subprocess.DEVNULL,
).decode().strip()
except Exception:
pass
return ""
ZUSAMMENFASSUNG_PROMPT = """Analysiere diesen kommunalpolitischen Antrag aus Hagen.
ZUSÄTZLICHE HINWEISE DES NUTZERS:
{anmerkung}
DOKUMENT:
{volltext}
---
Erstelle eine strukturierte Zusammenfassung im JSON-Format:
{{
"zusammenfassung": "2-3 Sätze, was gefordert wird",
"kernforderung": "Die zentrale Forderung in einem Satz",
"begruendung": "Warum wird das gefordert? (kurz)",
"thema": "Hauptthema (z.B. Verkehr, Soziales, Umwelt)",
"partei": "Antragstellende Fraktion falls erkennbar",
"orte": []
}}
NUR JSON ausgeben, keine Erklärungen."""
KETTEN_MATCH_PROMPT = """Du bist ein Analyst für kommunalpolitische Vorgänge in Hagen.
ZUSÄTZLICHE HINWEISE DES NUTZERS:
{anmerkung}
Vergleiche den URSPRÜNGLICHEN ANTRAG/ANFRAGE mit der ANTWORT/BESCHLUSS.
Bewerte ob die ursprüngliche Forderung tatsächlich erfüllt wurde.
=== URSPRÜNGLICHE FORDERUNG ===
Aktenzeichen: {az_ursprung}
{ki_zusammenfassung}
Volltext:
{volltext_ursprung}
=== BERATUNGEN & BESCHLÜSSE ===
{beratungen_text}
---
Bewerte NUR als JSON:
{{
"score": <0.0-1.0>,
"bewertung": "erfuellt|teilweise|abgewiegelt|nebelkerze|vertagt|unklar",
"begruendung": "1-2 Sätze warum",
"kernpunkt_erfuellt": true/false,
"details": "Was konkret beschlossen/abgelehnt wurde"
}}
Bewertungsskala:
- 1.0: Forderung vollständig erfüllt, konkreter Beschluss
- 0.7-0.9: Weitgehend erfüllt, kleine Abweichungen
- 0.4-0.6: Teilweise erfüllt oder auf den Weg gebracht
- 0.2-0.3: Abgewiegelt Verwaltung weicht aus, kündigt nur "Prüfung" an
- 0.0-0.1: Nebelkerze Thema gewechselt oder komplett ignoriert"""
class BewertungRequest(BaseModel):
anmerkung: str = ""
def _call_qwen(prompt: str) -> dict | None:
key = _get_key("QWEN_API_KEY")
if not key:
return None
try:
resp = httpx.post(
DASHSCOPE_URL,
headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"},
json={"model": "qwen-plus-latest", "messages": [{"role": "user", "content": prompt}], "temperature": 0.3},
timeout=180,
)
resp.raise_for_status()
content = resp.json()["choices"][0]["message"]["content"]
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
elif "```" in content:
content = content.split("```")[1].split("```")[0]
return json.loads(content.strip())
except Exception as e:
return {"error": str(e)}
def _run_zusammenfassung(vorlage_id: int, anmerkung: str, job_id: str):
"""Background task: re-evaluate a Vorlage."""
try:
conn = get_connection()
row = conn.execute("SELECT volltext_clean, aktenzeichen FROM vorlagen WHERE id = ?", (vorlage_id,)).fetchone()
if not row or not row["volltext_clean"]:
_jobs[job_id] = {"status": "error", "error": "Kein Volltext vorhanden"}
return
volltext = row["volltext_clean"]
if len(volltext) > 12000:
volltext = volltext[:12000] + "\n[...gekürzt...]"
prompt = ZUSAMMENFASSUNG_PROMPT.format(anmerkung=anmerkung or "(keine)", volltext=volltext)
result = _call_qwen(prompt)
if not result or "error" in result:
_jobs[job_id] = {"status": "error", "error": str(result)}
return
# Keep old versions, insert new
conn.execute(
"""INSERT INTO ki_bewertungen (vorlage_id, typ, begruendung, anmerkungen, modell, prompt_version, erstellt_at)
VALUES (?, 'zusammenfassung', ?, ?, 'qwen-plus-latest', 'v2-reeval', ?)""",
(vorlage_id, result.get("zusammenfassung"), json.dumps(result, ensure_ascii=False),
datetime.now().isoformat()),
)
if result.get("kernforderung"):
conn.execute("UPDATE vorlagen SET thema_kurz = ? WHERE id = ?", (result["kernforderung"][:200], vorlage_id))
conn.commit()
conn.close()
_jobs[job_id] = {"status": "done", "result": result}
except Exception as e:
_jobs[job_id] = {"status": "error", "error": str(e)}
def _run_ketten_bewertung(kette_id: int, anmerkung: str, job_id: str):
"""Background task: re-evaluate a Kette."""
try:
conn = get_connection()
# Get kette + ursprung
kette = conn.execute(
"SELECT k.*, v.aktenzeichen, v.volltext_clean FROM ketten k JOIN vorlagen v ON k.ursprung_id = v.id WHERE k.id = ?",
(kette_id,),
).fetchone()
if not kette:
_jobs[job_id] = {"status": "error", "error": "Kette nicht gefunden"}
return
# Get KI summary of ursprung
ki_row = conn.execute(
"SELECT anmerkungen FROM ki_bewertungen WHERE vorlage_id = ? AND typ = 'zusammenfassung' LIMIT 1",
(kette["ursprung_id"],),
).fetchone()
ki_text = ""
if ki_row and ki_row["anmerkungen"]:
try:
ki = json.loads(ki_row["anmerkungen"])
ki_text = f"Zusammenfassung: {ki.get('zusammenfassung', '')}\nKernforderung: {ki.get('kernforderung', '')}"
except Exception:
pass
# Collect beratungen
members = conn.execute(
"SELECT vorlage_id FROM ketten_glieder WHERE kette_id = ?", (kette_id,)
).fetchall()
member_ids = [m["vorlage_id"] for m in members]
placeholders = ",".join("?" * len(member_ids))
beratungen = conn.execute(
f"""SELECT b.sitzung_datum, b.rolle, b.ergebnis, b.beschlusstext, b.wortprotokoll,
v.aktenzeichen
FROM beratungen b JOIN vorlagen v ON b.vorlage_id = v.id
WHERE b.vorlage_id IN ({placeholders})
ORDER BY b.sitzung_datum""",
member_ids,
).fetchall()
beratungen_text = ""
for b in beratungen:
beratungen_text += f"\n--- {b['aktenzeichen']} | {b['sitzung_datum']} | {b['rolle']} ---\n"
if b["ergebnis"]:
beratungen_text += f"Ergebnis: {b['ergebnis']}\n"
if b["beschlusstext"]:
beratungen_text += f"Beschlusstext: {b['beschlusstext'][:500]}\n"
if b["wortprotokoll"]:
beratungen_text += f"Wortprotokoll: {b['wortprotokoll'][:1000]}\n"
volltext = kette["volltext_clean"] or ""
if len(volltext) > 8000:
volltext = volltext[:8000] + "\n[...gekürzt...]"
prompt = KETTEN_MATCH_PROMPT.format(
anmerkung=anmerkung or "(keine)",
az_ursprung=kette["aktenzeichen"],
ki_zusammenfassung=ki_text,
volltext_ursprung=volltext,
beratungen_text=beratungen_text or "(keine Beratungen vorhanden)",
)
result = _call_qwen(prompt)
if not result or "error" in result:
_jobs[job_id] = {"status": "error", "error": str(result)}
return
# Keep old versions, insert new
conn.execute(
"""INSERT INTO ki_bewertungen (vorlage_id, typ, score, begruendung, anmerkungen, modell, prompt_version, erstellt_at)
VALUES (?, 'umsetzung_match', ?, ?, ?, 'qwen-plus-latest', 'v2-reeval', ?)""",
(
kette["ursprung_id"],
result.get("score"),
result.get("begruendung"),
json.dumps(result, ensure_ascii=False),
datetime.now().isoformat(),
),
)
# Rebuild chain status
from tracker.core.chains import build_single_chain
build_single_chain(conn, kette["ursprung_id"])
conn.commit()
conn.close()
_jobs[job_id] = {"status": "done", "result": result}
except Exception as e:
_jobs[job_id] = {"status": "error", "error": str(e)}
@router.post("/vorlagen/{vorlage_id}")
def reeval_vorlage(vorlage_id: int, req: BewertungRequest, conn=Depends(_db)):
"""Trigger KI re-evaluation for a Vorlage."""
row = conn.execute("SELECT id FROM vorlagen WHERE id = ?", (vorlage_id,)).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Vorlage nicht gefunden")
job_id = f"vorlage-{vorlage_id}-{int(datetime.now().timestamp())}"
_jobs[job_id] = {"status": "running"}
t = threading.Thread(target=_run_zusammenfassung, args=(vorlage_id, req.anmerkung, job_id), daemon=True)
t.start()
return {"job_id": job_id, "status": "running"}
@router.post("/ketten/{kette_id}")
def reeval_kette(kette_id: int, req: BewertungRequest, conn=Depends(_db)):
"""Trigger KI re-evaluation for a Kette."""
row = conn.execute("SELECT id FROM ketten WHERE id = ?", (kette_id,)).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Kette nicht gefunden")
job_id = f"kette-{kette_id}-{int(datetime.now().timestamp())}"
_jobs[job_id] = {"status": "running"}
t = threading.Thread(target=_run_ketten_bewertung, args=(kette_id, req.anmerkung, job_id), daemon=True)
t.start()
return {"job_id": job_id, "status": "running"}
@router.get("/status/{job_id}")
def get_job_status(job_id: str):
"""Check status of a re-evaluation job."""
if job_id not in _jobs:
raise HTTPException(status_code=404, detail="Job nicht gefunden")
return _jobs[job_id]