Vollständige Pipeline zur Analyse kommunaler Vorlagen aus ALLRIS: - OParl-Import: 20.149 Vorlagen - PDF-Extraktion: 10.045 Volltexte (adaptives Throttling) - KI-Zusammenfassungen: 10.026 via Qwen Plus (parallelisiert) - Beratungsfolge-Scraper: Beschlusstexte + Wortprotokolle - Abstimmungs-Analyse mit Koalitionsmatrix - Georeferenzierung (Nominatim) Stack: FastAPI + SvelteKit + SQLite Deployment: Docker + Traefik auf VServer Daten (DB, Logs) nicht im Repo — siehe Restic-Backup. Repo-Setup: scripts/setup.sh für Neuaufbau aus OParl-API.
280 lines
8.9 KiB
Python
280 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
KI-gestützte Ortsextraktion aus Volltexten.
|
|
Zweistufiger Prozess:
|
|
1. Extraktion aller Ortsangaben mit Kontext
|
|
2. Intelligente Georeferenzierung mit Kontextverständnis
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
|
DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db"
|
|
|
|
DASHSCOPE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1/chat/completions"
|
|
DASHSCOPE_KEY = os.environ.get("QWEN_API_KEY") or os.popen("security find-generic-password -s qwen-api -w 2>/dev/null").read().strip()
|
|
|
|
# Nominatim für Geocoding
|
|
NOMINATIM_URL = "https://nominatim.openstreetmap.org/search"
|
|
USER_AGENT = "Antragstracker-Hagen/1.0"
|
|
HAGEN_BBOX = "7.35,51.30,7.65,51.45"
|
|
|
|
EXTRACTION_PROMPT = """Extrahiere ALLE geografischen Ortsangaben aus diesem kommunalpolitischen Dokument aus Hagen.
|
|
|
|
DOKUMENT:
|
|
{volltext}
|
|
|
|
---
|
|
|
|
Gib eine Liste aller Orte zurück, die im Text erwähnt werden. Für jeden Ort:
|
|
- rohtext: Die genaue Formulierung im Text
|
|
- kontext: Der Satz oder Absatz, in dem der Ort erwähnt wird
|
|
- typ: strasse|platz|stadtteil|gebaeude|sonstiges
|
|
- geocodierbar: true/false (kann man das auf einer Karte finden?)
|
|
- geocode_query: Falls geocodierbar, der beste Suchbegriff für Nominatim (z.B. bei "Polizeiwache an der Boeler Straße" → "Boeler Straße")
|
|
|
|
JSON-Format:
|
|
{{
|
|
"orte": [
|
|
{{
|
|
"rohtext": "Altenhagener Brücke",
|
|
"kontext": "Der Abschnitt ab der Altenhagener Brücke bis zum Aldi",
|
|
"typ": "strasse",
|
|
"geocodierbar": true,
|
|
"geocode_query": "Altenhagener Brücke, Hagen"
|
|
}},
|
|
{{
|
|
"rohtext": "Spielplatz",
|
|
"kontext": "Darüber hinaus befindet sich ein Spielplatz",
|
|
"typ": "gebaeude",
|
|
"geocodierbar": false,
|
|
"geocode_query": null
|
|
}}
|
|
]
|
|
}}
|
|
|
|
WICHTIG:
|
|
- Extrahiere ALLE Orte, auch generische
|
|
- Bei "X an der Y-Straße" ist Y-Straße der geocode_query
|
|
- Stadtteile wie "Altenhagen", "Haspe" sind geocodierbar
|
|
- Generische Begriffe wie "Schule", "Spielplatz" ohne Straßenangabe sind NICHT geocodierbar
|
|
|
|
NUR JSON, keine Erklärungen."""
|
|
|
|
|
|
GEOCODE_REFINEMENT_PROMPT = """Du bist ein Geocoding-Experte für die Stadt Hagen (NRW).
|
|
|
|
Ich habe folgende Ortsangaben aus einem kommunalpolitischen Dokument extrahiert:
|
|
{orte_json}
|
|
|
|
Der Volltext-Kontext war:
|
|
{kontext}
|
|
|
|
Nominatim hat für "{query}" folgende Ergebnisse in Hagen gefunden:
|
|
{nominatim_results}
|
|
|
|
Welches Ergebnis passt am besten zum Kontext? Antworte mit der Nummer (1, 2, 3...) oder "keins" wenn keins passt.
|
|
Nur die Nummer oder "keins", keine Erklärung."""
|
|
|
|
|
|
def get_db():
|
|
conn = sqlite3.connect(str(DB_PATH))
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def call_qwen(prompt: str, model: str = "qwen-turbo-latest") -> dict | str | None:
|
|
"""Ruft Qwen API auf."""
|
|
if not DASHSCOPE_KEY:
|
|
return None
|
|
|
|
try:
|
|
resp = httpx.post(
|
|
DASHSCOPE_URL,
|
|
headers={"Authorization": f"Bearer {DASHSCOPE_KEY}", "Content-Type": "application/json"},
|
|
json={"model": model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.1},
|
|
timeout=60
|
|
)
|
|
resp.raise_for_status()
|
|
content = resp.json()["choices"][0]["message"]["content"]
|
|
|
|
# JSON extrahieren wenn vorhanden
|
|
if "```json" in content:
|
|
content = content.split("```json")[1].split("```")[0]
|
|
elif "```" in content:
|
|
parts = content.split("```")
|
|
if len(parts) >= 2:
|
|
content = parts[1]
|
|
|
|
try:
|
|
return json.loads(content.strip())
|
|
except json.JSONDecodeError:
|
|
return content.strip()
|
|
|
|
except Exception as e:
|
|
print(f" API-Fehler: {e}")
|
|
return None
|
|
|
|
|
|
def geocode_nominatim(client: httpx.Client, query: str) -> list[dict]:
|
|
"""Sucht mit Nominatim in Hagen."""
|
|
try:
|
|
resp = client.get(
|
|
NOMINATIM_URL,
|
|
params={"q": f"{query}, Hagen, Germany", "format": "json", "limit": 3,
|
|
"viewbox": HAGEN_BBOX, "bounded": 1},
|
|
headers={"User-Agent": USER_AGENT},
|
|
timeout=10
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except Exception as e:
|
|
print(f" Nominatim-Fehler: {e}")
|
|
return []
|
|
|
|
|
|
def process_vorlage(conn: sqlite3.Connection, client: httpx.Client, vorlage: dict) -> int:
|
|
"""Extrahiert und geocodiert Orte aus einer Vorlage."""
|
|
vid = vorlage['id']
|
|
akz = vorlage['aktenzeichen'] or f"#{vid}"
|
|
volltext = vorlage['volltext_clean']
|
|
|
|
if not volltext or len(volltext) < 100:
|
|
return 0
|
|
|
|
# Volltext kürzen
|
|
volltext_short = volltext[:6000] if len(volltext) > 6000 else volltext
|
|
|
|
# Schritt 1: KI-Extraktion
|
|
prompt = EXTRACTION_PROMPT.format(volltext=volltext_short)
|
|
result = call_qwen(prompt)
|
|
|
|
if not result or not isinstance(result, dict) or 'orte' not in result:
|
|
print(f" {akz}: Keine Orte extrahiert")
|
|
return 0
|
|
|
|
orte = result['orte']
|
|
print(f" {akz}: {len(orte)} Orte gefunden")
|
|
|
|
# Schritt 2: Geocoding für geocodierbare Orte
|
|
success = 0
|
|
for ort in orte:
|
|
rohtext = ort.get('rohtext', '')
|
|
kontext = ort.get('kontext', '')
|
|
typ = ort.get('typ', 'sonstiges')
|
|
geocodierbar = ort.get('geocodierbar', False)
|
|
geocode_query = ort.get('geocode_query')
|
|
|
|
if not rohtext:
|
|
continue
|
|
|
|
# Prüfen ob schon existiert
|
|
existing = conn.execute(
|
|
"SELECT id FROM orte WHERE name = ? OR rohtext = ?",
|
|
(rohtext, rohtext)
|
|
).fetchone()
|
|
|
|
if existing:
|
|
# Nur Verknüpfung erstellen
|
|
conn.execute("""
|
|
INSERT OR IGNORE INTO vorlagen_orte (vorlage_id, ort_id, kontext)
|
|
VALUES (?, ?, ?)
|
|
""", (vid, existing['id'], kontext[:500]))
|
|
conn.execute("UPDATE orte SET vorlage_count = vorlage_count + 1 WHERE id = ?", (existing['id'],))
|
|
conn.commit()
|
|
continue
|
|
|
|
# Neuen Ort anlegen
|
|
lat, lon = None, None
|
|
status = 'skipped'
|
|
|
|
if geocodierbar and geocode_query:
|
|
time.sleep(1.1) # Nominatim Rate Limit
|
|
results = geocode_nominatim(client, geocode_query)
|
|
|
|
if results:
|
|
# Ersten Treffer nehmen (könnte mit KI verfeinert werden)
|
|
lat = float(results[0]['lat'])
|
|
lon = float(results[0]['lon'])
|
|
status = 'success'
|
|
print(f" ✓ {rohtext} → ({lat:.4f}, {lon:.4f})")
|
|
else:
|
|
status = 'failed'
|
|
print(f" ✗ {rohtext} (nicht gefunden)")
|
|
else:
|
|
print(f" ⊘ {rohtext} (nicht geocodierbar)")
|
|
|
|
cursor = conn.execute("""
|
|
INSERT INTO orte (name, typ, lat, lon, rohtext, kontext_satz, geocode_status, vorlage_count)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, 1)
|
|
""", (geocode_query or rohtext, typ, lat, lon, rohtext, kontext[:500], status))
|
|
|
|
ort_id = cursor.lastrowid
|
|
conn.execute("""
|
|
INSERT OR IGNORE INTO vorlagen_orte (vorlage_id, ort_id, kontext)
|
|
VALUES (?, ?, ?)
|
|
""", (vid, ort_id, kontext[:500]))
|
|
conn.commit()
|
|
|
|
if lat:
|
|
success += 1
|
|
|
|
return success
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="KI-gestützte Ortsextraktion")
|
|
parser.add_argument("--limit", type=int, default=10, help="Max. Anzahl Vorlagen")
|
|
parser.add_argument("--vorlage", type=int, help="Einzelne Vorlage-ID")
|
|
args = parser.parse_args()
|
|
|
|
print(f"=== KI-Ortsextraktion ===\n")
|
|
|
|
conn = get_db()
|
|
client = httpx.Client()
|
|
|
|
if args.vorlage:
|
|
query = "SELECT id, aktenzeichen, volltext_clean FROM vorlagen WHERE id = ?"
|
|
params = [args.vorlage]
|
|
else:
|
|
# Vorlagen mit Volltext die noch nicht verarbeitet wurden
|
|
query = """
|
|
SELECT v.id, v.aktenzeichen, v.volltext_clean
|
|
FROM vorlagen v
|
|
WHERE v.volltext_clean IS NOT NULL
|
|
AND v.id NOT IN (SELECT DISTINCT vorlage_id FROM vorlagen_orte)
|
|
ORDER BY v.datum_eingang DESC
|
|
LIMIT ?
|
|
"""
|
|
params = [args.limit]
|
|
|
|
vorlagen = conn.execute(query, params).fetchall()
|
|
print(f"Verarbeite {len(vorlagen)} Vorlagen\n")
|
|
|
|
total_success = 0
|
|
for v in vorlagen:
|
|
total_success += process_vorlage(conn, client, dict(v))
|
|
|
|
client.close()
|
|
|
|
# Stats
|
|
total_orte = conn.execute("SELECT COUNT(*) FROM orte").fetchone()[0]
|
|
geocoded = conn.execute("SELECT COUNT(*) FROM orte WHERE lat IS NOT NULL").fetchone()[0]
|
|
conn.close()
|
|
|
|
print(f"\n=== Fertig ===")
|
|
print(f"Orte gesamt: {total_orte}")
|
|
print(f"Geocodiert: {geocoded}")
|
|
print(f"Diese Runde: {total_success} neue geocodiert")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|