#!/usr/bin/env python3 """ Ketten-Match: Vergleicht Ursprungsforderung mit Blatt-Antwort via Gemini Flash. Bewertet ob Anträge tatsächlich erfüllt wurden. Überwacht Free Tier Limits (5h-Fenster, pausiert bei 85%). """ import argparse import json import os import re import sqlite3 import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from threading import Lock from typing import Optional import httpx PROJECT_ROOT = Path(__file__).resolve().parent.parent DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db" LOG_FILE = PROJECT_ROOT / "data" / "ketten_match.log" # Gemini API GEMINI_KEY = os.environ.get("GEMINI_API_KEY") or os.popen("security find-generic-password -s gemini-api -w 2>/dev/null").read().strip() GEMINI_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={GEMINI_KEY}" MODEL = "gemini-2.5-flash" # Rate Limiting MAX_REQUESTS_PER_5H = 1500 PAUSE_THRESHOLD = 0.85 WINDOW_SECONDS = 5 * 3600 PROMPT = """Du bist ein Analyst für kommunalpolitische Vorgänge in Hagen. Vergleiche den URSPRÜNGLICHEN ANTRAG/ANFRAGE mit der ANTWORT/BESCHLUSS. Bewerte ob die ursprüngliche Forderung tatsächlich erfüllt wurde. === URSPRÜNGLICHE FORDERUNG === Aktenzeichen: {az_ursprung} Typ: {typ_ursprung} {ki_zusammenfassung} Volltext (Auszug): {volltext_ursprung} === ANTWORT / BESCHLUSS === Aktenzeichen: {az_blatt} Typ: {typ_blatt} Beschlussart: {beschlussart} Beschlusstext: {beschlusstext} Wortprotokoll: {wortprotokoll} --- Bewerte NUR als JSON: {{ "score": <0.0-1.0>, "bewertung": "erfuellt|teilweise|abgewiegelt|nebelkerze|vertagt|unklar", "begruendung": "1-2 Sätze warum", "kernpunkt_erfuellt": true/false, "details": "Was konkret beschlossen/abgelehnt wurde" }} Bewertungsskala: - 1.0: Forderung vollständig erfüllt, konkreter Beschluss - 0.7-0.9: Weitgehend erfüllt, kleine Abweichungen - 0.4-0.6: Teilweise erfüllt oder auf den Weg gebracht - 0.2-0.3: Abgewiegelt — Verwaltung weicht aus, kündigt nur "Prüfung" an - 0.0-0.1: Nebelkerze — Thema gewechselt oder komplett ignoriert""" def _score_to_bewertung(score: float) -> str: if score >= 0.8: return "erfuellt" if score >= 0.5: return "teilweise" if score >= 0.2: return "abgewiegelt" return "nebelkerze" db_lock = Lock() log_lock = Lock() stats = {"success": 0, "failed": 0, "throttled": 0, "requests": 0, "window_start": time.time()} def log(msg: str): ts = time.strftime("%H:%M:%S") line = f"[{ts}] {msg}" print(line) with log_lock: with open(LOG_FILE, "a") as f: f.write(line + "\n") def get_db(): conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) conn.row_factory = sqlite3.Row return conn def check_rate_limit(): elapsed = time.time() - stats["window_start"] if elapsed > WINDOW_SECONDS: stats["requests"] = 0 stats["window_start"] = time.time() return usage_pct = stats["requests"] / MAX_REQUESTS_PER_5H if usage_pct >= PAUSE_THRESHOLD: remaining = WINDOW_SECONDS - elapsed log(f"⏸️ 85% Limit ({stats['requests']}/{MAX_REQUESTS_PER_5H}). Pause {remaining/60:.0f} Min.") time.sleep(remaining + 10) stats["requests"] = 0 stats["window_start"] = time.time() log("▶️ Weiter") def call_gemini(prompt: str, max_retries: int = 5) -> Optional[dict]: for attempt in range(max_retries): check_rate_limit() stats["requests"] += 1 try: resp = httpx.post(GEMINI_URL, json={ "contents": [{"parts": [{"text": prompt}]}], "generationConfig": { "temperature": 0.1, "responseMimeType": "application/json", "maxOutputTokens": 2048, "thinkingConfig": {"thinkingBudget": 0}, } }, timeout=60) if resp.status_code == 429: stats["throttled"] += 1 time.sleep(min(10, attempt + 2)) continue if resp.status_code != 200: time.sleep(1) continue text = resp.json()["candidates"][0]["content"]["parts"][0]["text"].strip() # Clean markdown code fences if text.startswith("```"): text = re.sub(r'^```\w*\n?', '', text) text = re.sub(r'\n?```$', '', text) # Try full JSON parse (multiple attempts) for parse_text in [text, text.replace('\n', ' '), re.sub(r'[\x00-\x1f]', ' ', text)]: try: parsed = json.loads(parse_text) if isinstance(parsed, dict) and "score" in parsed: parsed["score"] = float(parsed["score"]) if not parsed.get("bewertung"): parsed["bewertung"] = _score_to_bewertung(parsed["score"]) if not parsed.get("begruendung"): parsed["begruendung"] = parsed.get("details", "Keine Begründung") return parsed except (json.JSONDecodeError, ValueError, TypeError): continue # Regex fallback — extract all fields score_m = re.search(r'"score"\s*:\s*([\d.]+)', text) bew_m = re.search(r'"bewertung"\s*:\s*"([^"]+)"', text) grund_m = re.search(r'"begruendung"\s*:\s*"([^"]*(?:\\.[^"]*)*)"', text) detail_m = re.search(r'"details"\s*:\s*"([^"]*(?:\\.[^"]*)*)"', text) kern_m = re.search(r'"kernpunkt_erfuellt"\s*:\s*(true|false)', text) if score_m: score = float(score_m.group(1)) return { "score": score, "bewertung": bew_m.group(1) if bew_m else _score_to_bewertung(score), "begruendung": grund_m.group(1) if grund_m else (detail_m.group(1) if detail_m else "Regex-Fallback"), "kernpunkt_erfuellt": kern_m.group(1) == "true" if kern_m else score >= 0.5, "details": detail_m.group(1) if detail_m else "", } time.sleep(0.5) except Exception: time.sleep(0.5) return None def process_kette(kette_id: int, data: dict) -> bool: prompt = PROMPT.format(**data) result = call_gemini(prompt) if not result: stats["failed"] += 1 return False score = result.get("score", 0) bewertung = result.get("bewertung", "unklar") with db_lock: conn = get_db() conn.execute(""" INSERT INTO ki_bewertungen (vorlage_id, typ, score, begruendung, anmerkungen, modell, prompt_version) VALUES (?, 'umsetzung_match', ?, ?, ?, ?, 'v1-ketten-match') """, (data["ursprung_id"], score, result.get("begruendung", ""), json.dumps(result, ensure_ascii=False), MODEL)) conn.execute("UPDATE vorlagen SET ki_status = 'matched' WHERE id = ?", (data["ursprung_id"],)) conn.commit() conn.close() stats["success"] += 1 log(f" ✓ {data['az_ursprung']} → {bewertung} ({score:.1f}): {result.get('begruendung', '')[:60]}") return True def get_pending(conn, limit): return conn.execute(""" SELECT k.id as kette_id, v_u.id as ursprung_id, v_u.aktenzeichen as az_ursprung, v_u.typ as typ_ursprung, v_u.volltext_clean as volltext_ursprung, v_b.aktenzeichen as az_blatt, v_b.typ as typ_blatt, b.beschlussart, b.beschlusstext, b.wortprotokoll, ki.begruendung as ki_zusammenfassung FROM ketten k JOIN ketten_glieder kg_u ON kg_u.kette_id = k.id AND kg_u.position = 0 JOIN vorlagen v_u ON kg_u.vorlage_id = v_u.id JOIN ketten_glieder kg_b ON kg_b.kette_id = k.id AND kg_b.position = (SELECT MAX(position) FROM ketten_glieder WHERE kette_id = k.id) JOIN vorlagen v_b ON kg_b.vorlage_id = v_b.id LEFT JOIN beratungen b ON b.vorlage_id = v_b.id AND b.beschlusstext IS NOT NULL LEFT JOIN ki_bewertungen ki ON ki.vorlage_id = v_u.id AND ki.typ = 'zusammenfassung' LEFT JOIN ki_bewertungen ki_match ON ki_match.vorlage_id = v_u.id AND ki_match.typ = 'umsetzung_match' WHERE b.beschlusstext IS NOT NULL AND v_u.volltext_clean IS NOT NULL AND v_u.volltext_clean != '' AND ki_match.id IS NULL GROUP BY k.id ORDER BY v_u.datum_eingang DESC LIMIT ? """, (limit,)).fetchall() def main(): parser = argparse.ArgumentParser() parser.add_argument("--batch-size", type=int, default=100) parser.add_argument("--workers", type=int, default=3) args = parser.parse_args() log(f"=== Ketten-Match ({MODEL}) ===") log(f"Workers: {args.workers}, Batch: {args.batch_size}") if not GEMINI_KEY: log("FEHLER: Kein API-Key!") return 1 conn = get_db() rows = get_pending(conn, args.batch_size) conn.close() pending = [{ "kette_id": r["kette_id"], "ursprung_id": r["ursprung_id"], "az_ursprung": r["az_ursprung"], "typ_ursprung": r["typ_ursprung"] or "", "volltext_ursprung": (r["volltext_ursprung"] or "")[:5000], "az_blatt": r["az_blatt"] or "", "typ_blatt": r["typ_blatt"] or "", "beschlussart": r["beschlussart"] or "", "beschlusstext": (r["beschlusstext"] or "")[:2000], "wortprotokoll": (r["wortprotokoll"] or "")[:2000], "ki_zusammenfassung": (r["ki_zusammenfassung"] or "")[:500], } for r in rows] log(f"Zu verarbeiten: {len(pending)}") if not pending: log("Alle fertig!") return 0 start = time.time() with ThreadPoolExecutor(max_workers=args.workers) as executor: futures = {executor.submit(process_kette, p["kette_id"], p): p["az_ursprung"] for p in pending} for f in as_completed(futures): try: f.result() except Exception as e: log(f" ✗ {futures[f]}: {e}") elapsed = time.time() - start log(f"\n=== Batch fertig ===") log(f"✓ {stats['success']} | ✗ {stats['failed']} | ⏳ {stats['throttled']} throttled") log(f"Dauer: {elapsed:.0f}s | {stats['success']/max(elapsed,1):.2f} docs/sec") log(f"5h-Fenster: {stats['requests']}/{MAX_REQUESTS_PER_5H} ({stats['requests']*100/MAX_REQUESTS_PER_5H:.0f}%)") return 0 if not pending or stats["success"] >= len(pending) else 1 if __name__ == "__main__": sys.exit(main())