antragstracker/scripts/ketten_match.py

282 lines
10 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Ketten-Match: Vergleicht Ursprungsforderung mit Blatt-Antwort via Gemini Flash.
Bewertet ob Anträge tatsächlich erfüllt wurden.
Überwacht Free Tier Limits (5h-Fenster, pausiert bei 85%).
"""
import argparse
import json
import os
import re
import sqlite3
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from threading import Lock
from typing import Optional
import httpx
PROJECT_ROOT = Path(__file__).resolve().parent.parent
DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db"
LOG_FILE = PROJECT_ROOT / "data" / "ketten_match.log"
# Gemini API
GEMINI_KEY = os.environ.get("GEMINI_API_KEY") or os.popen("security find-generic-password -s gemini-api -w 2>/dev/null").read().strip()
GEMINI_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={GEMINI_KEY}"
MODEL = "gemini-2.5-flash"
# Rate Limiting
MAX_REQUESTS_PER_5H = 1500
PAUSE_THRESHOLD = 0.85
WINDOW_SECONDS = 5 * 3600
PROMPT = """Du bist ein Analyst für kommunalpolitische Vorgänge in Hagen.
Vergleiche den URSPRÜNGLICHEN ANTRAG/ANFRAGE mit der ANTWORT/BESCHLUSS.
Bewerte ob die ursprüngliche Forderung tatsächlich erfüllt wurde.
=== URSPRÜNGLICHE FORDERUNG ===
Aktenzeichen: {az_ursprung}
Typ: {typ_ursprung}
{ki_zusammenfassung}
Volltext (Auszug):
{volltext_ursprung}
=== ANTWORT / BESCHLUSS ===
Aktenzeichen: {az_blatt}
Typ: {typ_blatt}
Beschlussart: {beschlussart}
Beschlusstext:
{beschlusstext}
Wortprotokoll:
{wortprotokoll}
---
Bewerte NUR als JSON:
{{
"score": <0.0-1.0>,
"bewertung": "erfuellt|teilweise|abgewiegelt|nebelkerze|vertagt|unklar",
"begruendung": "1-2 Sätze warum",
"kernpunkt_erfuellt": true/false,
"details": "Was konkret beschlossen/abgelehnt wurde"
}}
Bewertungsskala:
- 1.0: Forderung vollständig erfüllt, konkreter Beschluss
- 0.7-0.9: Weitgehend erfüllt, kleine Abweichungen
- 0.4-0.6: Teilweise erfüllt oder auf den Weg gebracht
- 0.2-0.3: Abgewiegelt Verwaltung weicht aus, kündigt nur "Prüfung" an
- 0.0-0.1: Nebelkerze Thema gewechselt oder komplett ignoriert"""
def _score_to_bewertung(score: float) -> str:
if score >= 0.8: return "erfuellt"
if score >= 0.5: return "teilweise"
if score >= 0.2: return "abgewiegelt"
return "nebelkerze"
db_lock = Lock()
log_lock = Lock()
stats = {"success": 0, "failed": 0, "throttled": 0, "requests": 0, "window_start": time.time()}
def log(msg: str):
ts = time.strftime("%H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with log_lock:
with open(LOG_FILE, "a") as f:
f.write(line + "\n")
def get_db():
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def check_rate_limit():
elapsed = time.time() - stats["window_start"]
if elapsed > WINDOW_SECONDS:
stats["requests"] = 0
stats["window_start"] = time.time()
return
usage_pct = stats["requests"] / MAX_REQUESTS_PER_5H
if usage_pct >= PAUSE_THRESHOLD:
remaining = WINDOW_SECONDS - elapsed
log(f"⏸️ 85% Limit ({stats['requests']}/{MAX_REQUESTS_PER_5H}). Pause {remaining/60:.0f} Min.")
time.sleep(remaining + 10)
stats["requests"] = 0
stats["window_start"] = time.time()
log("▶️ Weiter")
def call_gemini(prompt: str, max_retries: int = 5) -> Optional[dict]:
for attempt in range(max_retries):
check_rate_limit()
stats["requests"] += 1
try:
resp = httpx.post(GEMINI_URL, json={
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"temperature": 0.1,
"responseMimeType": "application/json",
"maxOutputTokens": 2048,
"thinkingConfig": {"thinkingBudget": 0},
}
}, timeout=60)
if resp.status_code == 429:
stats["throttled"] += 1
time.sleep(min(10, attempt + 2))
continue
if resp.status_code != 200:
time.sleep(1)
continue
text = resp.json()["candidates"][0]["content"]["parts"][0]["text"].strip()
# Clean markdown code fences
if text.startswith("```"):
text = re.sub(r'^```\w*\n?', '', text)
text = re.sub(r'\n?```$', '', text)
# Try full JSON parse (multiple attempts)
for parse_text in [text, text.replace('\n', ' '), re.sub(r'[\x00-\x1f]', ' ', text)]:
try:
parsed = json.loads(parse_text)
if isinstance(parsed, dict) and "score" in parsed:
parsed["score"] = float(parsed["score"])
if not parsed.get("bewertung"):
parsed["bewertung"] = _score_to_bewertung(parsed["score"])
if not parsed.get("begruendung"):
parsed["begruendung"] = parsed.get("details", "Keine Begründung")
return parsed
except (json.JSONDecodeError, ValueError, TypeError):
continue
# Regex fallback — extract all fields
score_m = re.search(r'"score"\s*:\s*([\d.]+)', text)
bew_m = re.search(r'"bewertung"\s*:\s*"([^"]+)"', text)
grund_m = re.search(r'"begruendung"\s*:\s*"([^"]*(?:\\.[^"]*)*)"', text)
detail_m = re.search(r'"details"\s*:\s*"([^"]*(?:\\.[^"]*)*)"', text)
kern_m = re.search(r'"kernpunkt_erfuellt"\s*:\s*(true|false)', text)
if score_m:
score = float(score_m.group(1))
return {
"score": score,
"bewertung": bew_m.group(1) if bew_m else _score_to_bewertung(score),
"begruendung": grund_m.group(1) if grund_m else (detail_m.group(1) if detail_m else "Regex-Fallback"),
"kernpunkt_erfuellt": kern_m.group(1) == "true" if kern_m else score >= 0.5,
"details": detail_m.group(1) if detail_m else "",
}
time.sleep(0.5)
except Exception:
time.sleep(0.5)
return None
def process_kette(kette_id: int, data: dict) -> bool:
prompt = PROMPT.format(**data)
result = call_gemini(prompt)
if not result:
stats["failed"] += 1
return False
score = result.get("score", 0)
bewertung = result.get("bewertung", "unklar")
with db_lock:
conn = get_db()
conn.execute("""
INSERT INTO ki_bewertungen (vorlage_id, typ, score, begruendung, anmerkungen, modell, prompt_version)
VALUES (?, 'umsetzung_match', ?, ?, ?, ?, 'v1-ketten-match')
""", (data["ursprung_id"], score, result.get("begruendung", ""), json.dumps(result, ensure_ascii=False), MODEL))
conn.execute("UPDATE vorlagen SET ki_status = 'matched' WHERE id = ?", (data["ursprung_id"],))
conn.commit()
conn.close()
stats["success"] += 1
log(f"{data['az_ursprung']}{bewertung} ({score:.1f}): {result.get('begruendung', '')[:60]}")
return True
def get_pending(conn, limit):
return conn.execute("""
SELECT k.id as kette_id, v_u.id as ursprung_id,
v_u.aktenzeichen as az_ursprung, v_u.typ as typ_ursprung,
v_u.volltext_clean as volltext_ursprung,
v_b.aktenzeichen as az_blatt, v_b.typ as typ_blatt,
b.beschlussart, b.beschlusstext, b.wortprotokoll,
ki.begruendung as ki_zusammenfassung
FROM ketten k
JOIN ketten_glieder kg_u ON kg_u.kette_id = k.id AND kg_u.position = 0
JOIN vorlagen v_u ON kg_u.vorlage_id = v_u.id
JOIN ketten_glieder kg_b ON kg_b.kette_id = k.id
AND kg_b.position = (SELECT MAX(position) FROM ketten_glieder WHERE kette_id = k.id)
JOIN vorlagen v_b ON kg_b.vorlage_id = v_b.id
LEFT JOIN beratungen b ON b.vorlage_id = v_b.id AND b.beschlusstext IS NOT NULL
LEFT JOIN ki_bewertungen ki ON ki.vorlage_id = v_u.id AND ki.typ = 'zusammenfassung'
LEFT JOIN ki_bewertungen ki_match ON ki_match.vorlage_id = v_u.id AND ki_match.typ = 'umsetzung_match'
WHERE b.beschlusstext IS NOT NULL
AND v_u.volltext_clean IS NOT NULL AND v_u.volltext_clean != ''
AND ki_match.id IS NULL
GROUP BY k.id
ORDER BY v_u.datum_eingang DESC
LIMIT ?
""", (limit,)).fetchall()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--batch-size", type=int, default=100)
parser.add_argument("--workers", type=int, default=3)
args = parser.parse_args()
log(f"=== Ketten-Match ({MODEL}) ===")
log(f"Workers: {args.workers}, Batch: {args.batch_size}")
if not GEMINI_KEY:
log("FEHLER: Kein API-Key!")
return 1
conn = get_db()
rows = get_pending(conn, args.batch_size)
conn.close()
pending = [{
"kette_id": r["kette_id"], "ursprung_id": r["ursprung_id"],
"az_ursprung": r["az_ursprung"], "typ_ursprung": r["typ_ursprung"] or "",
"volltext_ursprung": (r["volltext_ursprung"] or "")[:5000],
"az_blatt": r["az_blatt"] or "", "typ_blatt": r["typ_blatt"] or "",
"beschlussart": r["beschlussart"] or "",
"beschlusstext": (r["beschlusstext"] or "")[:2000],
"wortprotokoll": (r["wortprotokoll"] or "")[:2000],
"ki_zusammenfassung": (r["ki_zusammenfassung"] or "")[:500],
} for r in rows]
log(f"Zu verarbeiten: {len(pending)}")
if not pending:
log("Alle fertig!")
return 0
start = time.time()
with ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = {executor.submit(process_kette, p["kette_id"], p): p["az_ursprung"] for p in pending}
for f in as_completed(futures):
try: f.result()
except Exception as e: log(f"{futures[f]}: {e}")
elapsed = time.time() - start
log(f"\n=== Batch fertig ===")
log(f"{stats['success']} | ✗ {stats['failed']} | ⏳ {stats['throttled']} throttled")
log(f"Dauer: {elapsed:.0f}s | {stats['success']/max(elapsed,1):.2f} docs/sec")
log(f"5h-Fenster: {stats['requests']}/{MAX_REQUESTS_PER_5H} ({stats['requests']*100/MAX_REQUESTS_PER_5H:.0f}%)")
return 0 if not pending or stats["success"] >= len(pending) else 1
if __name__ == "__main__":
sys.exit(main())