Vollständige Pipeline zur Analyse kommunaler Vorlagen aus ALLRIS: - OParl-Import: 20.149 Vorlagen - PDF-Extraktion: 10.045 Volltexte (adaptives Throttling) - KI-Zusammenfassungen: 10.026 via Qwen Plus (parallelisiert) - Beratungsfolge-Scraper: Beschlusstexte + Wortprotokolle - Abstimmungs-Analyse mit Koalitionsmatrix - Georeferenzierung (Nominatim) Stack: FastAPI + SvelteKit + SQLite Deployment: Docker + Traefik auf VServer Daten (DB, Logs) nicht im Repo — siehe Restic-Backup. Repo-Setup: scripts/setup.sh für Neuaufbau aus OParl-API.
294 lines
9.3 KiB
Python
294 lines
9.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Parallelisierte KI-Zusammenfassungen via Qwen/DashScope.
|
|
Nutzt ThreadPoolExecutor für parallele API-Calls.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from pathlib import Path
|
|
from threading import Lock
|
|
|
|
import httpx
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
|
DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db"
|
|
LOG_FILE = PROJECT_ROOT / "data" / "ki_parallel.log"
|
|
STATE_FILE = PROJECT_ROOT / "data" / "ki_parallel_state.json"
|
|
|
|
# DashScope API
|
|
DASHSCOPE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1/chat/completions"
|
|
DASHSCOPE_KEY = os.environ.get("QWEN_API_KEY") or os.popen("security find-generic-password -s qwen-api -w 2>/dev/null").read().strip()
|
|
|
|
# Nominatim für Geocoding
|
|
NOMINATIM_URL = "https://nominatim.openstreetmap.org/search"
|
|
USER_AGENT = "Antragstracker-Hagen/1.0"
|
|
HAGEN_BBOX = "7.35,51.30,7.65,51.45"
|
|
|
|
PROMPT_TEMPLATE = """Analysiere diesen kommunalpolitischen Antrag aus Hagen.
|
|
|
|
DOKUMENT:
|
|
{volltext}
|
|
|
|
---
|
|
|
|
Erstelle eine strukturierte Zusammenfassung im JSON-Format:
|
|
|
|
{{
|
|
"zusammenfassung": "2-3 Sätze, was gefordert wird",
|
|
"kernforderung": "Die zentrale Forderung in einem Satz",
|
|
"begruendung": "Warum wird das gefordert? (kurz)",
|
|
"thema": "Hauptthema (z.B. Verkehr, Soziales, Umwelt)",
|
|
"partei": "Antragstellende Fraktion falls erkennbar",
|
|
"orte": [
|
|
{{
|
|
"rohtext": "Die genaue Formulierung im Text",
|
|
"kontext": "Der Satz in dem der Ort erwähnt wird",
|
|
"typ": "strasse|platz|stadtteil|gebaeude|sonstiges",
|
|
"geocodierbar": true/false,
|
|
"geocode_query": "Suchbegriff für Karte"
|
|
}}
|
|
]
|
|
}}
|
|
|
|
NUR JSON ausgeben, keine Erklärungen."""
|
|
|
|
db_lock = Lock()
|
|
log_lock = Lock()
|
|
stats = {"success": 0, "failed": 0, "throttled": 0}
|
|
|
|
|
|
def log(msg: str):
|
|
timestamp = time.strftime("%H:%M:%S")
|
|
line = f"[{timestamp}] {msg}"
|
|
print(line)
|
|
with log_lock:
|
|
with open(LOG_FILE, "a") as f:
|
|
f.write(line + "\n")
|
|
|
|
|
|
def get_db():
|
|
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def call_qwen(prompt: str, max_retries: int = 5) -> dict | None:
|
|
"""Ruft Qwen API auf mit schnellen, häufigen Retries."""
|
|
for attempt in range(max_retries):
|
|
try:
|
|
resp = httpx.post(
|
|
DASHSCOPE_URL,
|
|
headers={
|
|
"Authorization": f"Bearer {DASHSCOPE_KEY}",
|
|
"Content-Type": "application/json"
|
|
},
|
|
json={
|
|
"model": "qwen-plus-latest",
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"temperature": 0.1,
|
|
"max_tokens": 2000,
|
|
"response_format": {"type": "json_object"},
|
|
},
|
|
timeout=30
|
|
)
|
|
|
|
if resp.status_code == 429:
|
|
wait = min(5, attempt + 1)
|
|
stats["throttled"] += 1
|
|
time.sleep(wait)
|
|
continue
|
|
|
|
resp.raise_for_status()
|
|
|
|
content = resp.json()["choices"][0]["message"]["content"]
|
|
content = content.strip()
|
|
if content.startswith("```"):
|
|
content = re.sub(r'^```\w*\n?', '', content)
|
|
content = re.sub(r'\n?```$', '', content)
|
|
|
|
return json.loads(content)
|
|
|
|
except json.JSONDecodeError:
|
|
time.sleep(0.5)
|
|
except httpx.HTTPStatusError as e:
|
|
time.sleep(1)
|
|
except httpx.TimeoutException:
|
|
time.sleep(0.5)
|
|
except Exception as e:
|
|
time.sleep(0.5)
|
|
|
|
log(f" ✗ 5 Retries fehlgeschlagen")
|
|
return None
|
|
|
|
|
|
def geocode_nominatim(query: str) -> tuple | None:
|
|
"""Geocodiert einen Ort in Hagen."""
|
|
try:
|
|
resp = httpx.get(NOMINATIM_URL, params={
|
|
"q": f"{query}, Hagen",
|
|
"format": "json",
|
|
"limit": 1,
|
|
"viewbox": HAGEN_BBOX,
|
|
"bounded": 1,
|
|
}, headers={"User-Agent": USER_AGENT}, timeout=10)
|
|
|
|
results = resp.json()
|
|
if results:
|
|
return (float(results[0]["lat"]), float(results[0]["lon"]))
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
|
|
def process_vorlage(vorlage_id: int, aktenzeichen: str, volltext: str) -> bool:
|
|
"""Verarbeitet eine einzelne Vorlage."""
|
|
text = volltext[:30000] # Qwen Plus hat 128k Context, aber >30k bringt selten Mehrwert
|
|
prompt = PROMPT_TEMPLATE.format(volltext=text)
|
|
|
|
result = call_qwen(prompt)
|
|
if not result:
|
|
stats["failed"] += 1
|
|
return False
|
|
|
|
# In DB speichern
|
|
with db_lock:
|
|
conn = get_db()
|
|
try:
|
|
conn.execute("""
|
|
INSERT INTO ki_bewertungen
|
|
(vorlage_id, typ, begruendung, anmerkungen, modell, prompt_version)
|
|
VALUES (?, 'zusammenfassung', ?, ?, 'qwen-plus-latest', 'v2-parallel')
|
|
""", (
|
|
vorlage_id,
|
|
result.get("zusammenfassung", ""),
|
|
json.dumps(result, ensure_ascii=False),
|
|
))
|
|
|
|
# Thema + Status in Vorlagen-Tabelle
|
|
conn.execute("""
|
|
UPDATE vorlagen SET thema_kurz = ?, ki_status = 'done' WHERE id = ?
|
|
""", (result.get("thema"), vorlage_id))
|
|
|
|
# Orte verarbeiten
|
|
geocoded = 0
|
|
for ort in result.get("orte", []):
|
|
rohtext = ort.get("rohtext", "")
|
|
geocodierbar = ort.get("geocodierbar", False)
|
|
geocode_query = ort.get("geocode_query")
|
|
typ = ort.get("typ", "sonstiges")
|
|
kontext = ort.get("kontext", "")
|
|
|
|
lat, lon = None, None
|
|
status = 'pending' if geocodierbar else 'skipped'
|
|
|
|
# Geocoding wird separat gemacht (Nominatim 1req/s Limit)
|
|
|
|
cursor = conn.execute("""
|
|
INSERT INTO orte (name, typ, lat, lon, rohtext, kontext_satz, geocode_status, vorlage_count)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, 1)
|
|
""", (geocode_query or rohtext, typ, lat, lon, rohtext, kontext[:500], status))
|
|
|
|
conn.execute("""
|
|
INSERT OR IGNORE INTO vorlagen_orte (vorlage_id, ort_id, kontext)
|
|
VALUES (?, ?, ?)
|
|
""", (vorlage_id, cursor.lastrowid, kontext[:500]))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
stats["success"] += 1
|
|
orte_count = len(result.get("orte", []))
|
|
log(f" ✓ {aktenzeichen}: {result.get('thema', '?')} ({orte_count} Orte, {geocoded} geocodiert)")
|
|
return True
|
|
|
|
except Exception as e:
|
|
conn.close()
|
|
log(f" DB-Fehler {aktenzeichen}: {e}")
|
|
stats["failed"] += 1
|
|
return False
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Parallele KI-Zusammenfassungen")
|
|
parser.add_argument("--workers", type=int, default=5, help="Parallele API-Calls")
|
|
parser.add_argument("--batch-size", type=int, default=100, help="Batch-Größe")
|
|
args = parser.parse_args()
|
|
|
|
log(f"=== Parallele KI-Zusammenfassung ===")
|
|
log(f"Workers: {args.workers}, Batch: {args.batch_size}")
|
|
|
|
if not DASHSCOPE_KEY:
|
|
log("FEHLER: Kein API-Key!")
|
|
return 1
|
|
|
|
conn = get_db()
|
|
|
|
# Zähle offene (nutze ki_status statt JOIN)
|
|
remaining = conn.execute("""
|
|
SELECT COUNT(*) FROM vorlagen
|
|
WHERE volltext_clean IS NOT NULL AND volltext_clean != ''
|
|
AND ki_status IS NULL
|
|
""").fetchone()[0]
|
|
|
|
log(f"Noch zu verarbeiten: {remaining}")
|
|
|
|
if remaining == 0:
|
|
log("Alle fertig!")
|
|
conn.close()
|
|
return 0
|
|
|
|
# Batch holen
|
|
vorlagen = conn.execute("""
|
|
SELECT id, aktenzeichen, volltext_clean
|
|
FROM vorlagen
|
|
WHERE volltext_clean IS NOT NULL AND volltext_clean != ''
|
|
AND ki_status IS NULL
|
|
ORDER BY datum_eingang DESC
|
|
LIMIT ?
|
|
""", (args.batch_size,)).fetchall()
|
|
|
|
conn.close()
|
|
|
|
log(f"Batch: {len(vorlagen)} Vorlagen\n")
|
|
|
|
start_time = time.time()
|
|
|
|
with ThreadPoolExecutor(max_workers=args.workers) as executor:
|
|
futures = {}
|
|
for v in vorlagen:
|
|
future = executor.submit(
|
|
process_vorlage,
|
|
v['id'], v['aktenzeichen'], v['volltext_clean']
|
|
)
|
|
futures[future] = v['aktenzeichen']
|
|
|
|
for future in as_completed(futures):
|
|
ak = futures[future]
|
|
try:
|
|
future.result()
|
|
except Exception as e:
|
|
log(f" ✗ {ak}: {e}")
|
|
|
|
elapsed = time.time() - start_time
|
|
docs_per_sec = stats["success"] / max(elapsed, 1)
|
|
remaining_after = remaining - stats["success"]
|
|
|
|
log(f"\n=== Batch fertig ===")
|
|
log(f"✓ {stats['success']} | ✗ {stats['failed']} | ⏳ {stats['throttled']} throttled")
|
|
log(f"Dauer: {elapsed:.0f}s | {docs_per_sec:.2f} docs/sec")
|
|
log(f"Verbleibend: {remaining_after}")
|
|
log(f"ETA: {remaining_after / max(docs_per_sec, 0.01) / 3600:.1f}h")
|
|
|
|
return 0 if remaining_after == 0 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|