from __future__ import annotations """API routes for KI re-evaluation.""" import json import os import sqlite3 import threading from datetime import datetime import httpx from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from tracker.db.session import get_connection # Note: Re-evaluation should re-scrape ALLRIS data before KI evaluation # to exclude transfer errors. See Gitea issue for full spec. # IMPORTANT: Destructive changes (deleting old bewertungen etc.) only after # explicit user request with explanation. Keep old data as backup where possible. router = APIRouter(prefix="/bewertung", tags=["bewertung"]) def _db(): conn = get_connection() try: yield conn finally: conn.close() # API config DASHSCOPE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1/chat/completions" GEMINI_URL_TEMPLATE = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={key}" # Job tracking _jobs: dict[str, dict] = {} def _get_key(name: str) -> str: """Get API key from env or macOS keychain.""" val = os.environ.get(name) if val: return val import subprocess keychain_map = {"QWEN_API_KEY": "qwen-api", "GEMINI_API_KEY": "gemini-api"} svc = keychain_map.get(name) if svc: try: return subprocess.check_output( ["security", "find-generic-password", "-s", svc, "-w"], stderr=subprocess.DEVNULL, ).decode().strip() except Exception: pass return "" ZUSAMMENFASSUNG_PROMPT = """Analysiere diesen kommunalpolitischen Antrag aus Hagen. ZUSÄTZLICHE HINWEISE DES NUTZERS: {anmerkung} DOKUMENT: {volltext} --- Erstelle eine strukturierte Zusammenfassung im JSON-Format: {{ "zusammenfassung": "2-3 Sätze, was gefordert wird", "kernforderung": "Die zentrale Forderung in einem Satz", "begruendung": "Warum wird das gefordert? (kurz)", "thema": "Hauptthema (z.B. Verkehr, Soziales, Umwelt)", "partei": "Antragstellende Fraktion falls erkennbar", "orte": [] }} NUR JSON ausgeben, keine Erklärungen.""" KETTEN_MATCH_PROMPT = """Du bist ein Analyst für kommunalpolitische Vorgänge in Hagen. ZUSÄTZLICHE HINWEISE DES NUTZERS: {anmerkung} Vergleiche den URSPRÜNGLICHEN ANTRAG/ANFRAGE mit der ANTWORT/BESCHLUSS. Bewerte ob die ursprüngliche Forderung tatsächlich erfüllt wurde. === URSPRÜNGLICHE FORDERUNG === Aktenzeichen: {az_ursprung} {ki_zusammenfassung} Volltext: {volltext_ursprung} === BERATUNGEN & BESCHLÜSSE === {beratungen_text} --- Bewerte NUR als JSON: {{ "score": <0.0-1.0>, "bewertung": "erfuellt|teilweise|abgewiegelt|nebelkerze|vertagt|unklar", "begruendung": "1-2 Sätze warum", "kernpunkt_erfuellt": true/false, "details": "Was konkret beschlossen/abgelehnt wurde" }} Bewertungsskala: - 1.0: Forderung vollständig erfüllt, konkreter Beschluss - 0.7-0.9: Weitgehend erfüllt, kleine Abweichungen - 0.4-0.6: Teilweise erfüllt oder auf den Weg gebracht - 0.2-0.3: Abgewiegelt — Verwaltung weicht aus, kündigt nur "Prüfung" an - 0.0-0.1: Nebelkerze — Thema gewechselt oder komplett ignoriert""" class BewertungRequest(BaseModel): anmerkung: str = "" def _call_qwen(prompt: str) -> dict | None: key = _get_key("QWEN_API_KEY") if not key: return None try: resp = httpx.post( DASHSCOPE_URL, headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"}, json={"model": "qwen-plus-latest", "messages": [{"role": "user", "content": prompt}], "temperature": 0.3}, timeout=180, ) resp.raise_for_status() content = resp.json()["choices"][0]["message"]["content"] if "```json" in content: content = content.split("```json")[1].split("```")[0] elif "```" in content: content = content.split("```")[1].split("```")[0] return json.loads(content.strip()) except Exception as e: return {"error": str(e)} def _run_zusammenfassung(vorlage_id: int, anmerkung: str, job_id: str): """Background task: re-evaluate a Vorlage.""" try: conn = get_connection() row = conn.execute("SELECT volltext_clean, aktenzeichen FROM vorlagen WHERE id = ?", (vorlage_id,)).fetchone() if not row or not row["volltext_clean"]: _jobs[job_id] = {"status": "error", "error": "Kein Volltext vorhanden"} return volltext = row["volltext_clean"] if len(volltext) > 12000: volltext = volltext[:12000] + "\n[...gekürzt...]" prompt = ZUSAMMENFASSUNG_PROMPT.format(anmerkung=anmerkung or "(keine)", volltext=volltext) result = _call_qwen(prompt) if not result or "error" in result: _jobs[job_id] = {"status": "error", "error": str(result)} return # Keep old versions, insert new conn.execute( """INSERT INTO ki_bewertungen (vorlage_id, typ, begruendung, anmerkungen, modell, prompt_version, erstellt_at) VALUES (?, 'zusammenfassung', ?, ?, 'qwen-plus-latest', 'v2-reeval', ?)""", (vorlage_id, result.get("zusammenfassung"), json.dumps(result, ensure_ascii=False), datetime.now().isoformat()), ) if result.get("kernforderung"): conn.execute("UPDATE vorlagen SET thema_kurz = ? WHERE id = ?", (result["kernforderung"][:200], vorlage_id)) conn.commit() # Auto-trigger Ketten-Bewertung wenn Vorlage in einer Kette ist kette_row = conn.execute( "SELECT kette_id FROM ketten_glieder WHERE vorlage_id = ? LIMIT 1", (vorlage_id,), ).fetchone() conn.close() if kette_row: _jobs[job_id] = {"status": "running", "result": result, "phase": "umsetzung"} _run_ketten_bewertung(kette_row["kette_id"], anmerkung, job_id) else: _jobs[job_id] = {"status": "done", "result": result} except Exception as e: _jobs[job_id] = {"status": "error", "error": str(e)} def _run_ketten_bewertung(kette_id: int, anmerkung: str, job_id: str): """Background task: re-evaluate a Kette.""" try: conn = get_connection() # Get kette + ursprung kette = conn.execute( "SELECT k.*, v.aktenzeichen, v.volltext_clean FROM ketten k JOIN vorlagen v ON k.ursprung_id = v.id WHERE k.id = ?", (kette_id,), ).fetchone() if not kette: _jobs[job_id] = {"status": "error", "error": "Kette nicht gefunden"} return # Get KI summary of ursprung ki_row = conn.execute( "SELECT anmerkungen FROM ki_bewertungen WHERE vorlage_id = ? AND typ = 'zusammenfassung' LIMIT 1", (kette["ursprung_id"],), ).fetchone() ki_text = "" if ki_row and ki_row["anmerkungen"]: try: ki = json.loads(ki_row["anmerkungen"]) ki_text = f"Zusammenfassung: {ki.get('zusammenfassung', '')}\nKernforderung: {ki.get('kernforderung', '')}" except Exception: pass # Collect beratungen members = conn.execute( "SELECT vorlage_id FROM ketten_glieder WHERE kette_id = ?", (kette_id,) ).fetchall() member_ids = [m["vorlage_id"] for m in members] placeholders = ",".join("?" * len(member_ids)) beratungen = conn.execute( f"""SELECT b.sitzung_datum, b.rolle, b.ergebnis, b.beschlusstext, b.wortprotokoll, v.aktenzeichen FROM beratungen b JOIN vorlagen v ON b.vorlage_id = v.id WHERE b.vorlage_id IN ({placeholders}) ORDER BY b.sitzung_datum""", member_ids, ).fetchall() beratungen_text = "" for b in beratungen: beratungen_text += f"\n--- {b['aktenzeichen']} | {b['sitzung_datum']} | {b['rolle']} ---\n" if b["ergebnis"]: beratungen_text += f"Ergebnis: {b['ergebnis']}\n" if b["beschlusstext"]: beratungen_text += f"Beschlusstext: {b['beschlusstext'][:500]}\n" if b["wortprotokoll"]: beratungen_text += f"Wortprotokoll: {b['wortprotokoll'][:1000]}\n" volltext = kette["volltext_clean"] or "" if len(volltext) > 8000: volltext = volltext[:8000] + "\n[...gekürzt...]" prompt = KETTEN_MATCH_PROMPT.format( anmerkung=anmerkung or "(keine)", az_ursprung=kette["aktenzeichen"], ki_zusammenfassung=ki_text, volltext_ursprung=volltext, beratungen_text=beratungen_text or "(keine Beratungen vorhanden)", ) result = _call_qwen(prompt) if not result or "error" in result: _jobs[job_id] = {"status": "error", "error": str(result)} return # Keep old versions, insert new conn.execute( """INSERT INTO ki_bewertungen (vorlage_id, typ, score, begruendung, anmerkungen, modell, prompt_version, erstellt_at) VALUES (?, 'umsetzung_match', ?, ?, ?, 'qwen-plus-latest', 'v2-reeval', ?)""", ( kette["ursprung_id"], result.get("score"), result.get("begruendung"), json.dumps(result, ensure_ascii=False), datetime.now().isoformat(), ), ) # Update chain status based on KI score score = result.get("score", 0) bewertung = result.get("bewertung", "") # Map KI bewertung → Ketten-Status if score >= 0.7: new_status = "umgesetzt" begruendung = f"KI-Bewertung: {score*100:.0f}% umgesetzt. {result.get('begruendung', '')}" elif score >= 0.4: new_status = "teilweise_umgesetzt" begruendung = f"KI-Bewertung: {score*100:.0f}% teilweise umgesetzt. {result.get('begruendung', '')}" elif bewertung == "abgewiegelt" or bewertung == "nebelkerze": new_status = "abgewiegelt" begruendung = f"KI-Bewertung: {score*100:.0f}% — {bewertung}. {result.get('begruendung', '')}" elif score < 0.3: new_status = "versandet" begruendung = f"KI-Bewertung: {score*100:.0f}%. {result.get('begruendung', '')}" else: new_status = kette["status"] # Keep current begruendung = kette["begruendung"] conn.execute( "UPDATE ketten SET status = ?, begruendung = ? WHERE id = ?", (new_status, begruendung, kette_id), ) conn.commit() conn.close() _jobs[job_id] = {"status": "done", "result": result} except Exception as e: _jobs[job_id] = {"status": "error", "error": str(e)} @router.post("/vorlagen/{vorlage_id}") def reeval_vorlage(vorlage_id: int, req: BewertungRequest, conn=Depends(_db)): """Trigger KI re-evaluation for a Vorlage.""" row = conn.execute("SELECT id FROM vorlagen WHERE id = ?", (vorlage_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Vorlage nicht gefunden") job_id = f"vorlage-{vorlage_id}-{int(datetime.now().timestamp())}" _jobs[job_id] = {"status": "running"} t = threading.Thread(target=_run_zusammenfassung, args=(vorlage_id, req.anmerkung, job_id), daemon=True) t.start() return {"job_id": job_id, "status": "running"} @router.post("/ketten/{kette_id}") def reeval_kette(kette_id: int, req: BewertungRequest, conn=Depends(_db)): """Trigger KI re-evaluation for a Kette.""" row = conn.execute("SELECT id FROM ketten WHERE id = ?", (kette_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Kette nicht gefunden") job_id = f"kette-{kette_id}-{int(datetime.now().timestamp())}" _jobs[job_id] = {"status": "running"} t = threading.Thread(target=_run_ketten_bewertung, args=(kette_id, req.anmerkung, job_id), daemon=True) t.start() return {"job_id": job_id, "status": "running"} @router.get("/status/{job_id}") def get_job_status(job_id: str): """Check status of a re-evaluation job.""" if job_id not in _jobs: raise HTTPException(status_code=404, detail="Job nicht gefunden") return _jobs[job_id]