antragstracker/scripts/import_oparl.py

504 lines
18 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
OParl-Importer für den Antragstracker Hagen.
Liest alle Papers von der ALLRIS OParl-API und speichert sie in die SQLite-DB.
Nutzung:
python scripts/import_oparl.py # Voll-Import (alle Seiten)
python scripts/import_oparl.py --resume 337 # Ab Seite 337 weitermachen
python scripts/import_oparl.py --incremental # Nur neue Papers (stoppt bei bekannten)
python scripts/import_oparl.py --limit 100 # Nur 100 Papers (Test)
python scripts/import_oparl.py --resolve-gremien # Nur Gremien-Namen auflösen
"""
import argparse
import functools
import re
import sqlite3
import sys
import time
from pathlib import Path
import httpx
# Unbuffered print für Live-Fortschritt
print = functools.partial(print, flush=True)
OPARL_BASE = "https://allris.hagen.de/public/oparl"
PAPERS_URL = f"{OPARL_BASE}/papers"
ORGS_URL = f"{OPARL_BASE}/organizations"
PROJECT_ROOT = Path(__file__).resolve().parent.parent
SCHEMA_PATH = PROJECT_ROOT / "backend" / "src" / "tracker" / "db" / "schema.sql"
DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db"
# OParl paperType → interner Typ
PAPER_TYPE_MAP = {
"Anfrage": "anfrage",
"Antrag": "antrag",
"Beschlussvorlage": "beschlussvorlage",
"Beschlussvorlage BBM": "beschlussvorlage",
"Beschlussvorlage WBH": "beschlussvorlage",
"Mitteilungsvorlage": "mitteilungsvorlage",
"Mitteilung": "mitteilungsvorlage",
"Mitteilung WBH": "mitteilungsvorlage",
"Stellungnahme": "stellungnahme",
"Berichtsvorlage": "bericht",
"Vorschlag zur Tagesordnung": "antrag",
"Dringlichkeitsantrag": "antrag",
"Dringlichkeitsanfrage": "anfrage",
"Änderungsantrag": "aenderungsantrag",
"Ergänzungsantrag": "ergaenzungsantrag",
"Bericht": "bericht",
"Resolution": "resolution",
}
# Regex für Aktenzeichen-Parsing
AKZ_RE = re.compile(r"^(\d+/\d+)(?:-(\d+))?$")
# Gremien-Typ aus OParl classification ableiten
GREMIUM_TYP_MAP = {
"Rat": "rat",
"Bezirksvertretung": "bv",
"Ausschuss": "ausschuss",
"Beirat": "beirat",
}
def init_db() -> sqlite3.Connection:
"""Erstellt die DB und führt schema.sql aus."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.execute("PRAGMA journal_mode = WAL")
conn.execute("PRAGMA foreign_keys = ON")
conn.row_factory = sqlite3.Row
schema = SCHEMA_PATH.read_text(encoding="utf-8")
conn.executescript(schema)
# Schema-Migration: oparl_id in beratungen (für Dedup)
cols = {r[1] for r in conn.execute("PRAGMA table_info(beratungen)").fetchall()}
if "oparl_id" not in cols:
conn.execute("ALTER TABLE beratungen ADD COLUMN oparl_id TEXT")
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_beratungen_oparl ON beratungen(oparl_id)")
print(" Migration: oparl_id zu beratungen hinzugefügt")
conn.commit()
return conn
def fetch_page(client: httpx.Client, url: str, params: dict,
max_retries: int = 3) -> dict | None:
"""Holt eine API-Seite mit Retry-Logik."""
for attempt in range(max_retries):
try:
resp = client.get(url, params=params, timeout=30)
resp.raise_for_status()
return resp.json()
except httpx.TimeoutException:
print(f" Timeout, Versuch {attempt + 1}/{max_retries}")
if attempt < max_retries - 1:
time.sleep(5 * (attempt + 1))
except Exception as e:
print(f" Fehler: {e}")
if attempt < max_retries - 1:
time.sleep(3)
return None
def parse_aktenzeichen(reference: str | None) -> tuple[str | None, str | None, str | None]:
"""Zerlegt ein Aktenzeichen in (aktenzeichen, basis, suffix)."""
if not reference:
return None, None, None
m = AKZ_RE.match(reference.strip())
if m:
basis = m.group(1)
suffix = m.group(2)
return reference.strip(), basis, f"-{suffix}" if suffix else None
return reference.strip(), reference.strip(), None
def map_paper_type(oparl_type: str | None) -> str:
if not oparl_type:
return "sonstig"
return PAPER_TYPE_MAP.get(oparl_type, "sonstig")
def is_verwaltungsvorlage(paper_type: str | None) -> bool:
if not paper_type:
return False
return paper_type.startswith("Beschlussvorlage") or paper_type.startswith("Mitteilung")
def upsert_paper(conn: sqlite3.Connection, paper: dict) -> tuple[int | None, bool]:
"""Fügt ein Paper ein oder aktualisiert es.
Returns (vorlage_id, is_new) is_new=False means it already existed.
"""
oparl_id = paper.get("id")
if not oparl_id:
return None, False
# Check if already exists
existing = conn.execute(
"SELECT id FROM vorlagen WHERE oparl_id = ?", (oparl_id,)
).fetchone()
reference = paper.get("reference")
aktenzeichen, basis, suffix = parse_aktenzeichen(reference)
oparl_type = paper.get("paperType")
typ = map_paper_type(oparl_type)
betreff = paper.get("name", "")
datum = paper.get("date")
web_url = paper.get("web")
main_file = paper.get("mainFile")
pdf_url = None
if isinstance(main_file, dict):
pdf_url = main_file.get("accessUrl") or main_file.get("downloadUrl")
try:
if existing:
vorlage_id = existing["id"]
conn.execute(
"""UPDATE vorlagen SET
aktenzeichen = ?, aktenzeichen_basis = ?, aktenzeichen_suffix = ?,
typ = ?, betreff = ?, datum_eingang = ?,
pdf_url = ?, web_url = ?, ist_verwaltungsvorlage = ?,
scraped_at = CURRENT_TIMESTAMP
WHERE id = ?""",
(aktenzeichen, basis, suffix, typ, betreff, datum,
pdf_url, web_url, is_verwaltungsvorlage(oparl_type), vorlage_id),
)
return vorlage_id, False
else:
cur = conn.execute(
"""INSERT INTO vorlagen
(oparl_id, aktenzeichen, aktenzeichen_basis, aktenzeichen_suffix,
typ, betreff, datum_eingang, pdf_url, web_url, ist_verwaltungsvorlage)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(oparl_id, aktenzeichen, basis, suffix,
typ, betreff, datum, pdf_url, web_url,
is_verwaltungsvorlage(oparl_type)),
)
return cur.lastrowid, True
except sqlite3.Error as e:
print(f" DB-Fehler bei {reference}: {e}")
return None, False
def upsert_consultations(conn: sqlite3.Connection, vorlage_id: int, paper: dict):
"""Speichert die Beratungsfolge mit Dedup über oparl_id."""
consultations = paper.get("consultation") or []
for cons in consultations:
if not isinstance(cons, dict):
continue
cons_oparl_id = cons.get("id")
# Gremium auflösen
orgs = cons.get("organization") or []
gremium_id = None
for org_url in orgs:
if not isinstance(org_url, str):
continue
# Nur typ=gr sind echte Gremien
if "typ=gr" not in org_url:
continue
conn.execute(
"INSERT OR IGNORE INTO gremien (oparl_id, name) VALUES (?, ?)",
(org_url, _org_placeholder_name(org_url)),
)
row = conn.execute(
"SELECT id FROM gremien WHERE oparl_id = ?", (org_url,)
).fetchone()
if row:
gremium_id = row["id"]
rolle = cons.get("role")
authoritative = cons.get("authoritative", False)
if cons_oparl_id:
conn.execute(
"""INSERT INTO beratungen (oparl_id, vorlage_id, gremium_id, rolle)
VALUES (?, ?, ?, ?)
ON CONFLICT(oparl_id) DO UPDATE SET
vorlage_id = excluded.vorlage_id,
gremium_id = excluded.gremium_id,
rolle = excluded.rolle""",
(cons_oparl_id, vorlage_id, gremium_id, rolle),
)
else:
# Fallback ohne oparl_id: prüfe auf Duplikat
exists = conn.execute(
"""SELECT 1 FROM beratungen
WHERE vorlage_id = ? AND gremium_id IS ? AND rolle IS ?
LIMIT 1""",
(vorlage_id, gremium_id, rolle),
).fetchone()
if not exists:
conn.execute(
"INSERT INTO beratungen (vorlage_id, gremium_id, rolle) VALUES (?, ?, ?)",
(vorlage_id, gremium_id, rolle),
)
def insert_files(conn: sqlite3.Connection, vorlage_id: int, paper: dict):
"""Speichert Anlagen eines Papers (nur neue)."""
aux_files = paper.get("auxiliaryFile") or []
for f in aux_files:
if not isinstance(f, dict):
continue
url = f.get("accessUrl") or f.get("downloadUrl")
name = f.get("name") or f.get("fileName", "")
if url:
exists = conn.execute(
"SELECT 1 FROM anlagen WHERE vorlage_id = ? AND url = ?",
(vorlage_id, url),
).fetchone()
if not exists:
conn.execute(
"INSERT INTO anlagen (vorlage_id, dateiname, url) VALUES (?, ?, ?)",
(vorlage_id, name, url),
)
def _org_placeholder_name(org_url: str) -> str:
"""Extrahiere Platzhalter-Name aus URL."""
org_id = org_url.split("id=")[-1] if "id=" in org_url else org_url
return f"Gremium {org_id}"
def build_suffix_references(conn: sqlite3.Connection):
"""Erstellt automatische Suffix-Referenzen."""
print("\nErstelle Suffix-Referenzen...")
# Parent → Child
conn.execute(
"""INSERT OR IGNORE INTO referenzen (quelle_id, ziel_id, typ, konfidenz)
SELECT parent.id, child.id, 'suffix', 1.0
FROM vorlagen child
JOIN vorlagen parent ON child.aktenzeichen_basis = parent.aktenzeichen_basis
WHERE child.aktenzeichen_suffix IS NOT NULL
AND parent.aktenzeichen_suffix IS NULL
AND child.id != parent.id"""
)
# Sequential: -1 → -2, -2 → -3, etc.
conn.execute(
"""INSERT OR IGNORE INTO referenzen (quelle_id, ziel_id, typ, konfidenz)
SELECT earlier.id, later.id, 'suffix', 1.0
FROM vorlagen later
JOIN vorlagen earlier
ON later.aktenzeichen_basis = earlier.aktenzeichen_basis
AND later.aktenzeichen_suffix IS NOT NULL
AND earlier.aktenzeichen_suffix IS NOT NULL
AND CAST(REPLACE(later.aktenzeichen_suffix, '-', '') AS INTEGER)
= CAST(REPLACE(earlier.aktenzeichen_suffix, '-', '') AS INTEGER) + 1
WHERE later.id != earlier.id"""
)
count = conn.execute("SELECT changes()").fetchone()[0]
conn.commit()
print(f" Suffix-Referenzen aktualisiert (letzte Runde: {count} neue)")
def resolve_gremien(conn: sqlite3.Connection, client: httpx.Client):
"""Löst Gremien-Namen über die OParl Organizations-API auf.
Aktualisiert name, kuerzel und typ für alle Gremien mit Platzhalter-Namen.
"""
print("\nLöse Gremien-Namen auf...")
# Alle Organisationen von der API holen
org_lookup: dict[str, dict] = {}
page = 1
while True:
data = fetch_page(client, ORGS_URL, {"body": 1, "page": page})
if not data or "data" not in data:
break
for org in data["data"]:
oparl_id = org.get("id")
if oparl_id:
classification = org.get("classification", "")
typ = GREMIUM_TYP_MAP.get(classification, "sonstig")
org_lookup[oparl_id] = {
"name": org.get("name", ""),
"kuerzel": org.get("shortName", ""),
"typ": typ,
"classification": classification,
}
total_pages = data.get("pagination", {}).get("totalPages", page)
if page >= total_pages:
break
page += 1
time.sleep(0.2)
print(f" {len(org_lookup)} Organisationen von API geladen")
# Gremien in DB aktualisieren
updated = 0
for row in conn.execute("SELECT id, oparl_id, name FROM gremien").fetchall():
oparl_id = row["oparl_id"]
if oparl_id in org_lookup:
org = org_lookup[oparl_id]
if org["name"] and org["name"] != row["name"]:
conn.execute(
"UPDATE gremien SET name = ?, kuerzel = ?, typ = ? WHERE id = ?",
(org["name"], org["kuerzel"] or None, org["typ"], row["id"]),
)
updated += 1
conn.commit()
print(f" {updated} Gremien aktualisiert")
def import_papers(conn: sqlite3.Connection, client: httpx.Client,
start_page: int = 1, limit: int = 0, incremental: bool = False):
"""Importiert Papers von der OParl-API.
Args:
start_page: Erste Seite (für --resume)
limit: Max. Anzahl Papers (0 = alle)
incremental: Stoppt wenn nur bereits bekannte Papers gefunden werden
"""
# Pagination ermitteln
print("Ermittle Seitenanzahl...")
first = fetch_page(client, PAPERS_URL, {"body": 1, "page": 1})
if not first or "pagination" not in first:
print("FEHLER: Konnte API nicht erreichen")
return
total_pages = first["pagination"]["totalPages"]
total_elements = first["pagination"]["totalElements"]
print(f" {total_elements} Papers auf {total_pages} Seiten")
existing_count = conn.execute("SELECT COUNT(*) FROM vorlagen").fetchone()[0]
print(f" {existing_count} bereits in DB\n")
total_new = 0
total_updated = 0
consecutive_known_pages = 0 # Für --incremental Abbruch
for page_num in range(start_page, total_pages + 1):
if page_num == 1 and start_page == 1:
data = first
else:
data = fetch_page(client, PAPERS_URL, {"body": 1, "page": page_num})
if not data or "data" not in data:
print(f" Seite {page_num} übersprungen (kein Data)")
continue
papers = data["data"]
page_new = 0
for paper in papers:
vorlage_id, is_new = upsert_paper(conn, paper)
if vorlage_id:
upsert_consultations(conn, vorlage_id, paper)
insert_files(conn, vorlage_id, paper)
if is_new:
page_new += 1
total_new += 1
else:
total_updated += 1
conn.commit()
progress = (page_num / total_pages) * 100
marker = f" (+{page_new} neu)" if page_new > 0 else " (bekannt)"
print(f" Seite {page_num:4d}/{total_pages} ({progress:5.1f}%)"
f"{len(papers)} Papers{marker}"
f" — neu: {total_new}, aktualisiert: {total_updated}")
# Incremental: Abbruch wenn 3 Seiten hintereinander nur bekannte Papers
if incremental:
if page_new == 0:
consecutive_known_pages += 1
if consecutive_known_pages >= 3:
print(f"\n Inkrementell: 3 Seiten ohne neue Papers, stoppe.")
break
else:
consecutive_known_pages = 0
if limit and (total_new + total_updated) >= limit:
print(f"\n Limit von {limit} erreicht, stoppe.")
break
# Schonende Pause
time.sleep(0.3)
if page_num % 100 == 0:
print(f" Checkpoint Seite {page_num} — Pause 5s...")
time.sleep(5)
return total_new, total_updated
def print_stats(conn: sqlite3.Connection):
"""Gibt aktuelle DB-Statistiken aus."""
print(f"\n=== Datenbank-Statistiken ===")
print(f" Vorlagen: {conn.execute('SELECT COUNT(*) FROM vorlagen').fetchone()[0]}")
print(f" Beratungen: {conn.execute('SELECT COUNT(*) FROM beratungen').fetchone()[0]}")
print(f" Gremien: {conn.execute('SELECT COUNT(*) FROM gremien').fetchone()[0]}")
print(f" Referenzen: {conn.execute('SELECT COUNT(*) FROM referenzen').fetchone()[0]}")
print(f" Anlagen: {conn.execute('SELECT COUNT(*) FROM anlagen').fetchone()[0]}")
print(f"\n Vorlagen nach Typ:")
for r in conn.execute("SELECT typ, COUNT(*) c FROM vorlagen GROUP BY typ ORDER BY c DESC"):
print(f" {r['typ']:25s} {r['c']:>6d}")
print(f"\n Zeitraum: {conn.execute('SELECT MIN(datum_eingang) FROM vorlagen').fetchone()[0]}"
f" bis {conn.execute('SELECT MAX(datum_eingang) FROM vorlagen').fetchone()[0]}")
def main():
parser = argparse.ArgumentParser(description="OParl-Import für Antragstracker Hagen")
parser.add_argument("--resume", type=int, default=0,
help="Ab dieser Seitennummer weitermachen")
parser.add_argument("--incremental", action="store_true",
help="Nur neue Papers (stoppt bei bekannten)")
parser.add_argument("--limit", type=int, default=0,
help="Max. Anzahl Papers (0 = alle)")
parser.add_argument("--resolve-gremien", action="store_true",
help="Nur Gremien-Namen auflösen, kein Paper-Import")
parser.add_argument("--no-references", action="store_true",
help="Suffix-Referenzen nicht neu bauen")
args = parser.parse_args()
print("=== Antragstracker Hagen — OParl-Import ===\n")
conn = init_db()
print(f" DB: {DB_PATH}\n")
client = httpx.Client(
headers={"Accept": "application/json"},
follow_redirects=True,
)
try:
if args.resolve_gremien:
resolve_gremien(conn, client)
print_stats(conn)
return
# Paper-Import
start_page = args.resume if args.resume > 0 else 1
if args.resume:
print(f"Setze Import ab Seite {start_page} fort...\n")
elif args.incremental:
print("Inkrementeller Import (nur neue Papers)...\n")
else:
print("Voll-Import...\n")
import_papers(conn, client, start_page, args.limit, args.incremental)
# Gremien-Namen auflösen
resolve_gremien(conn, client)
# Suffix-Referenzen
if not args.no_references:
build_suffix_references(conn)
print_stats(conn)
finally:
client.close()
conn.close()
if __name__ == "__main__":
main()