#!/usr/bin/env python3 """ Parallelisierte KI-Zusammenfassungen via Qwen/DashScope. Nutzt ThreadPoolExecutor für parallele API-Calls. """ import argparse import json import os import re import sqlite3 import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from threading import Lock import httpx PROJECT_ROOT = Path(__file__).resolve().parent.parent DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db" LOG_FILE = PROJECT_ROOT / "data" / "ki_parallel.log" STATE_FILE = PROJECT_ROOT / "data" / "ki_parallel_state.json" # DashScope API DASHSCOPE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1/chat/completions" DASHSCOPE_KEY = os.environ.get("QWEN_API_KEY") or os.popen("security find-generic-password -s qwen-api -w 2>/dev/null").read().strip() # Nominatim für Geocoding NOMINATIM_URL = "https://nominatim.openstreetmap.org/search" USER_AGENT = "Antragstracker-Hagen/1.0" HAGEN_BBOX = "7.35,51.30,7.65,51.45" PROMPT_TEMPLATE = """Analysiere diesen kommunalpolitischen Antrag aus Hagen. DOKUMENT: {volltext} --- Erstelle eine strukturierte Zusammenfassung im JSON-Format: {{ "zusammenfassung": "2-3 Sätze, was gefordert wird", "kernforderung": "Die zentrale Forderung in einem Satz", "begruendung": "Warum wird das gefordert? (kurz)", "thema": "Hauptthema (z.B. Verkehr, Soziales, Umwelt)", "partei": "Antragstellende Fraktion falls erkennbar", "orte": [ {{ "rohtext": "Die genaue Formulierung im Text", "kontext": "Der Satz in dem der Ort erwähnt wird", "typ": "strasse|platz|stadtteil|gebaeude|sonstiges", "geocodierbar": true/false, "geocode_query": "Suchbegriff für Karte" }} ] }} NUR JSON ausgeben, keine Erklärungen.""" db_lock = Lock() log_lock = Lock() stats = {"success": 0, "failed": 0, "throttled": 0} def log(msg: str): timestamp = time.strftime("%H:%M:%S") line = f"[{timestamp}] {msg}" print(line) with log_lock: with open(LOG_FILE, "a") as f: f.write(line + "\n") def get_db(): conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) conn.row_factory = sqlite3.Row return conn def call_qwen(prompt: str, max_retries: int = 5) -> dict | None: """Ruft Qwen API auf mit schnellen, häufigen Retries.""" for attempt in range(max_retries): try: resp = httpx.post( DASHSCOPE_URL, headers={ "Authorization": f"Bearer {DASHSCOPE_KEY}", "Content-Type": "application/json" }, json={ "model": "qwen-plus-latest", "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "max_tokens": 2000, "response_format": {"type": "json_object"}, }, timeout=30 ) if resp.status_code == 429: wait = min(5, attempt + 1) stats["throttled"] += 1 time.sleep(wait) continue resp.raise_for_status() content = resp.json()["choices"][0]["message"]["content"] content = content.strip() if content.startswith("```"): content = re.sub(r'^```\w*\n?', '', content) content = re.sub(r'\n?```$', '', content) return json.loads(content) except json.JSONDecodeError: time.sleep(0.5) except httpx.HTTPStatusError as e: time.sleep(1) except httpx.TimeoutException: time.sleep(0.5) except Exception as e: time.sleep(0.5) log(f" ✗ 5 Retries fehlgeschlagen") return None def geocode_nominatim(query: str) -> tuple | None: """Geocodiert einen Ort in Hagen.""" try: resp = httpx.get(NOMINATIM_URL, params={ "q": f"{query}, Hagen", "format": "json", "limit": 1, "viewbox": HAGEN_BBOX, "bounded": 1, }, headers={"User-Agent": USER_AGENT}, timeout=10) results = resp.json() if results: return (float(results[0]["lat"]), float(results[0]["lon"])) except: pass return None def process_vorlage(vorlage_id: int, aktenzeichen: str, volltext: str) -> bool: """Verarbeitet eine einzelne Vorlage.""" text = volltext[:30000] # Qwen Plus hat 128k Context, aber >30k bringt selten Mehrwert prompt = PROMPT_TEMPLATE.format(volltext=text) result = call_qwen(prompt) if not result: stats["failed"] += 1 return False # In DB speichern with db_lock: conn = get_db() try: conn.execute(""" INSERT INTO ki_bewertungen (vorlage_id, typ, begruendung, anmerkungen, modell, prompt_version) VALUES (?, 'zusammenfassung', ?, ?, 'qwen-plus-latest', 'v2-parallel') """, ( vorlage_id, result.get("zusammenfassung", ""), json.dumps(result, ensure_ascii=False), )) # Thema + Status in Vorlagen-Tabelle conn.execute(""" UPDATE vorlagen SET thema_kurz = ?, ki_status = 'done' WHERE id = ? """, (result.get("thema"), vorlage_id)) # Orte verarbeiten geocoded = 0 for ort in result.get("orte", []): rohtext = ort.get("rohtext", "") geocodierbar = ort.get("geocodierbar", False) geocode_query = ort.get("geocode_query") typ = ort.get("typ", "sonstiges") kontext = ort.get("kontext", "") lat, lon = None, None status = 'pending' if geocodierbar else 'skipped' # Geocoding wird separat gemacht (Nominatim 1req/s Limit) cursor = conn.execute(""" INSERT INTO orte (name, typ, lat, lon, rohtext, kontext_satz, geocode_status, vorlage_count) VALUES (?, ?, ?, ?, ?, ?, ?, 1) """, (geocode_query or rohtext, typ, lat, lon, rohtext, kontext[:500], status)) conn.execute(""" INSERT OR IGNORE INTO vorlagen_orte (vorlage_id, ort_id, kontext) VALUES (?, ?, ?) """, (vorlage_id, cursor.lastrowid, kontext[:500])) conn.commit() conn.close() stats["success"] += 1 orte_count = len(result.get("orte", [])) log(f" ✓ {aktenzeichen}: {result.get('thema', '?')} ({orte_count} Orte, {geocoded} geocodiert)") return True except Exception as e: conn.close() log(f" DB-Fehler {aktenzeichen}: {e}") stats["failed"] += 1 return False def main(): parser = argparse.ArgumentParser(description="Parallele KI-Zusammenfassungen") parser.add_argument("--workers", type=int, default=5, help="Parallele API-Calls") parser.add_argument("--batch-size", type=int, default=100, help="Batch-Größe") args = parser.parse_args() log(f"=== Parallele KI-Zusammenfassung ===") log(f"Workers: {args.workers}, Batch: {args.batch_size}") if not DASHSCOPE_KEY: log("FEHLER: Kein API-Key!") return 1 conn = get_db() # Zähle offene (nutze ki_status statt JOIN) remaining = conn.execute(""" SELECT COUNT(*) FROM vorlagen WHERE volltext_clean IS NOT NULL AND volltext_clean != '' AND ki_status IS NULL """).fetchone()[0] log(f"Noch zu verarbeiten: {remaining}") if remaining == 0: log("Alle fertig!") conn.close() return 0 # Batch holen vorlagen = conn.execute(""" SELECT id, aktenzeichen, volltext_clean FROM vorlagen WHERE volltext_clean IS NOT NULL AND volltext_clean != '' AND ki_status IS NULL ORDER BY datum_eingang DESC LIMIT ? """, (args.batch_size,)).fetchall() conn.close() log(f"Batch: {len(vorlagen)} Vorlagen\n") start_time = time.time() with ThreadPoolExecutor(max_workers=args.workers) as executor: futures = {} for v in vorlagen: future = executor.submit( process_vorlage, v['id'], v['aktenzeichen'], v['volltext_clean'] ) futures[future] = v['aktenzeichen'] for future in as_completed(futures): ak = futures[future] try: future.result() except Exception as e: log(f" ✗ {ak}: {e}") elapsed = time.time() - start_time docs_per_sec = stats["success"] / max(elapsed, 1) remaining_after = remaining - stats["success"] log(f"\n=== Batch fertig ===") log(f"✓ {stats['success']} | ✗ {stats['failed']} | ⏳ {stats['throttled']} throttled") log(f"Dauer: {elapsed:.0f}s | {docs_per_sec:.2f} docs/sec") log(f"Verbleibend: {remaining_after}") log(f"ETA: {remaining_after / max(docs_per_sec, 0.01) / 3600:.1f}h") return 0 if remaining_after == 0 else 1 if __name__ == "__main__": sys.exit(main())