antragstracker/scripts/ki_parallel.py

294 lines
9.3 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Parallelisierte KI-Zusammenfassungen via Qwen/DashScope.
Nutzt ThreadPoolExecutor für parallele API-Calls.
"""
import argparse
import json
import os
import re
import sqlite3
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from threading import Lock
import httpx
PROJECT_ROOT = Path(__file__).resolve().parent.parent
DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db"
LOG_FILE = PROJECT_ROOT / "data" / "ki_parallel.log"
STATE_FILE = PROJECT_ROOT / "data" / "ki_parallel_state.json"
# DashScope API
DASHSCOPE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1/chat/completions"
DASHSCOPE_KEY = os.environ.get("QWEN_API_KEY") or os.popen("security find-generic-password -s qwen-api -w 2>/dev/null").read().strip()
# Nominatim für Geocoding
NOMINATIM_URL = "https://nominatim.openstreetmap.org/search"
USER_AGENT = "Antragstracker-Hagen/1.0"
HAGEN_BBOX = "7.35,51.30,7.65,51.45"
PROMPT_TEMPLATE = """Analysiere diesen kommunalpolitischen Antrag aus Hagen.
DOKUMENT:
{volltext}
---
Erstelle eine strukturierte Zusammenfassung im JSON-Format:
{{
"zusammenfassung": "2-3 Sätze, was gefordert wird",
"kernforderung": "Die zentrale Forderung in einem Satz",
"begruendung": "Warum wird das gefordert? (kurz)",
"thema": "Hauptthema (z.B. Verkehr, Soziales, Umwelt)",
"partei": "Antragstellende Fraktion falls erkennbar",
"orte": [
{{
"rohtext": "Die genaue Formulierung im Text",
"kontext": "Der Satz in dem der Ort erwähnt wird",
"typ": "strasse|platz|stadtteil|gebaeude|sonstiges",
"geocodierbar": true/false,
"geocode_query": "Suchbegriff für Karte"
}}
]
}}
NUR JSON ausgeben, keine Erklärungen."""
db_lock = Lock()
log_lock = Lock()
stats = {"success": 0, "failed": 0, "throttled": 0}
def log(msg: str):
timestamp = time.strftime("%H:%M:%S")
line = f"[{timestamp}] {msg}"
print(line)
with log_lock:
with open(LOG_FILE, "a") as f:
f.write(line + "\n")
def get_db():
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def call_qwen(prompt: str, max_retries: int = 5) -> dict | None:
"""Ruft Qwen API auf mit schnellen, häufigen Retries."""
for attempt in range(max_retries):
try:
resp = httpx.post(
DASHSCOPE_URL,
headers={
"Authorization": f"Bearer {DASHSCOPE_KEY}",
"Content-Type": "application/json"
},
json={
"model": "qwen-plus-latest",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"max_tokens": 2000,
"response_format": {"type": "json_object"},
},
timeout=30
)
if resp.status_code == 429:
wait = min(5, attempt + 1)
stats["throttled"] += 1
time.sleep(wait)
continue
resp.raise_for_status()
content = resp.json()["choices"][0]["message"]["content"]
content = content.strip()
if content.startswith("```"):
content = re.sub(r'^```\w*\n?', '', content)
content = re.sub(r'\n?```$', '', content)
return json.loads(content)
except json.JSONDecodeError:
time.sleep(0.5)
except httpx.HTTPStatusError as e:
time.sleep(1)
except httpx.TimeoutException:
time.sleep(0.5)
except Exception as e:
time.sleep(0.5)
log(f" ✗ 5 Retries fehlgeschlagen")
return None
def geocode_nominatim(query: str) -> tuple | None:
"""Geocodiert einen Ort in Hagen."""
try:
resp = httpx.get(NOMINATIM_URL, params={
"q": f"{query}, Hagen",
"format": "json",
"limit": 1,
"viewbox": HAGEN_BBOX,
"bounded": 1,
}, headers={"User-Agent": USER_AGENT}, timeout=10)
results = resp.json()
if results:
return (float(results[0]["lat"]), float(results[0]["lon"]))
except:
pass
return None
def process_vorlage(vorlage_id: int, aktenzeichen: str, volltext: str) -> bool:
"""Verarbeitet eine einzelne Vorlage."""
text = volltext[:30000] # Qwen Plus hat 128k Context, aber >30k bringt selten Mehrwert
prompt = PROMPT_TEMPLATE.format(volltext=text)
result = call_qwen(prompt)
if not result:
stats["failed"] += 1
return False
# In DB speichern
with db_lock:
conn = get_db()
try:
conn.execute("""
INSERT INTO ki_bewertungen
(vorlage_id, typ, begruendung, anmerkungen, modell, prompt_version)
VALUES (?, 'zusammenfassung', ?, ?, 'qwen-plus-latest', 'v2-parallel')
""", (
vorlage_id,
result.get("zusammenfassung", ""),
json.dumps(result, ensure_ascii=False),
))
# Thema + Status in Vorlagen-Tabelle
conn.execute("""
UPDATE vorlagen SET thema_kurz = ?, ki_status = 'done' WHERE id = ?
""", (result.get("thema"), vorlage_id))
# Orte verarbeiten
geocoded = 0
for ort in result.get("orte", []):
rohtext = ort.get("rohtext", "")
geocodierbar = ort.get("geocodierbar", False)
geocode_query = ort.get("geocode_query")
typ = ort.get("typ", "sonstiges")
kontext = ort.get("kontext", "")
lat, lon = None, None
status = 'pending' if geocodierbar else 'skipped'
# Geocoding wird separat gemacht (Nominatim 1req/s Limit)
cursor = conn.execute("""
INSERT INTO orte (name, typ, lat, lon, rohtext, kontext_satz, geocode_status, vorlage_count)
VALUES (?, ?, ?, ?, ?, ?, ?, 1)
""", (geocode_query or rohtext, typ, lat, lon, rohtext, kontext[:500], status))
conn.execute("""
INSERT OR IGNORE INTO vorlagen_orte (vorlage_id, ort_id, kontext)
VALUES (?, ?, ?)
""", (vorlage_id, cursor.lastrowid, kontext[:500]))
conn.commit()
conn.close()
stats["success"] += 1
orte_count = len(result.get("orte", []))
log(f"{aktenzeichen}: {result.get('thema', '?')} ({orte_count} Orte, {geocoded} geocodiert)")
return True
except Exception as e:
conn.close()
log(f" DB-Fehler {aktenzeichen}: {e}")
stats["failed"] += 1
return False
def main():
parser = argparse.ArgumentParser(description="Parallele KI-Zusammenfassungen")
parser.add_argument("--workers", type=int, default=5, help="Parallele API-Calls")
parser.add_argument("--batch-size", type=int, default=100, help="Batch-Größe")
args = parser.parse_args()
log(f"=== Parallele KI-Zusammenfassung ===")
log(f"Workers: {args.workers}, Batch: {args.batch_size}")
if not DASHSCOPE_KEY:
log("FEHLER: Kein API-Key!")
return 1
conn = get_db()
# Zähle offene (nutze ki_status statt JOIN)
remaining = conn.execute("""
SELECT COUNT(*) FROM vorlagen
WHERE volltext_clean IS NOT NULL AND volltext_clean != ''
AND ki_status IS NULL
""").fetchone()[0]
log(f"Noch zu verarbeiten: {remaining}")
if remaining == 0:
log("Alle fertig!")
conn.close()
return 0
# Batch holen
vorlagen = conn.execute("""
SELECT id, aktenzeichen, volltext_clean
FROM vorlagen
WHERE volltext_clean IS NOT NULL AND volltext_clean != ''
AND ki_status IS NULL
ORDER BY datum_eingang DESC
LIMIT ?
""", (args.batch_size,)).fetchall()
conn.close()
log(f"Batch: {len(vorlagen)} Vorlagen\n")
start_time = time.time()
with ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = {}
for v in vorlagen:
future = executor.submit(
process_vorlage,
v['id'], v['aktenzeichen'], v['volltext_clean']
)
futures[future] = v['aktenzeichen']
for future in as_completed(futures):
ak = futures[future]
try:
future.result()
except Exception as e:
log(f"{ak}: {e}")
elapsed = time.time() - start_time
docs_per_sec = stats["success"] / max(elapsed, 1)
remaining_after = remaining - stats["success"]
log(f"\n=== Batch fertig ===")
log(f"{stats['success']} | ✗ {stats['failed']} | ⏳ {stats['throttled']} throttled")
log(f"Dauer: {elapsed:.0f}s | {docs_per_sec:.2f} docs/sec")
log(f"Verbleibend: {remaining_after}")
log(f"ETA: {remaining_after / max(docs_per_sec, 0.01) / 3600:.1f}h")
return 0 if remaining_after == 0 else 1
if __name__ == "__main__":
sys.exit(main())