#!/usr/bin/env python3 """ OParl-Importer für den Antragstracker Hagen. Liest alle Papers von der ALLRIS OParl-API und speichert sie in die SQLite-DB. Nutzung: python scripts/import_oparl.py # Voll-Import (alle Seiten) python scripts/import_oparl.py --resume 337 # Ab Seite 337 weitermachen python scripts/import_oparl.py --incremental # Nur neue Papers (stoppt bei bekannten) python scripts/import_oparl.py --limit 100 # Nur 100 Papers (Test) python scripts/import_oparl.py --resolve-gremien # Nur Gremien-Namen auflösen """ import argparse import functools import re import sqlite3 import sys import time from pathlib import Path import httpx # Unbuffered print für Live-Fortschritt print = functools.partial(print, flush=True) OPARL_BASE = "https://allris.hagen.de/public/oparl" PAPERS_URL = f"{OPARL_BASE}/papers" ORGS_URL = f"{OPARL_BASE}/organizations" PROJECT_ROOT = Path(__file__).resolve().parent.parent SCHEMA_PATH = PROJECT_ROOT / "backend" / "src" / "tracker" / "db" / "schema.sql" DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db" # OParl paperType → interner Typ PAPER_TYPE_MAP = { "Anfrage": "anfrage", "Antrag": "antrag", "Beschlussvorlage": "beschlussvorlage", "Beschlussvorlage BBM": "beschlussvorlage", "Beschlussvorlage WBH": "beschlussvorlage", "Mitteilungsvorlage": "mitteilungsvorlage", "Mitteilung": "mitteilungsvorlage", "Mitteilung WBH": "mitteilungsvorlage", "Stellungnahme": "stellungnahme", "Berichtsvorlage": "bericht", "Vorschlag zur Tagesordnung": "antrag", "Dringlichkeitsantrag": "antrag", "Dringlichkeitsanfrage": "anfrage", "Änderungsantrag": "aenderungsantrag", "Ergänzungsantrag": "ergaenzungsantrag", "Bericht": "bericht", "Resolution": "resolution", } # Regex für Aktenzeichen-Parsing AKZ_RE = re.compile(r"^(\d+/\d+)(?:-(\d+))?$") # Gremien-Typ aus OParl classification ableiten GREMIUM_TYP_MAP = { "Rat": "rat", "Bezirksvertretung": "bv", "Ausschuss": "ausschuss", "Beirat": "beirat", } def init_db() -> sqlite3.Connection: """Erstellt die DB und führt schema.sql aus.""" DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(DB_PATH)) conn.execute("PRAGMA journal_mode = WAL") conn.execute("PRAGMA foreign_keys = ON") conn.row_factory = sqlite3.Row schema = SCHEMA_PATH.read_text(encoding="utf-8") conn.executescript(schema) # Schema-Migration: oparl_id in beratungen (für Dedup) cols = {r[1] for r in conn.execute("PRAGMA table_info(beratungen)").fetchall()} if "oparl_id" not in cols: conn.execute("ALTER TABLE beratungen ADD COLUMN oparl_id TEXT") conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_beratungen_oparl ON beratungen(oparl_id)") print(" Migration: oparl_id zu beratungen hinzugefügt") conn.commit() return conn def fetch_page(client: httpx.Client, url: str, params: dict, max_retries: int = 3) -> dict | None: """Holt eine API-Seite mit Retry-Logik.""" for attempt in range(max_retries): try: resp = client.get(url, params=params, timeout=30) resp.raise_for_status() return resp.json() except httpx.TimeoutException: print(f" Timeout, Versuch {attempt + 1}/{max_retries}") if attempt < max_retries - 1: time.sleep(5 * (attempt + 1)) except Exception as e: print(f" Fehler: {e}") if attempt < max_retries - 1: time.sleep(3) return None def parse_aktenzeichen(reference: str | None) -> tuple[str | None, str | None, str | None]: """Zerlegt ein Aktenzeichen in (aktenzeichen, basis, suffix).""" if not reference: return None, None, None m = AKZ_RE.match(reference.strip()) if m: basis = m.group(1) suffix = m.group(2) return reference.strip(), basis, f"-{suffix}" if suffix else None return reference.strip(), reference.strip(), None def map_paper_type(oparl_type: str | None) -> str: if not oparl_type: return "sonstig" return PAPER_TYPE_MAP.get(oparl_type, "sonstig") def is_verwaltungsvorlage(paper_type: str | None) -> bool: if not paper_type: return False return paper_type.startswith("Beschlussvorlage") or paper_type.startswith("Mitteilung") def upsert_paper(conn: sqlite3.Connection, paper: dict) -> tuple[int | None, bool]: """Fügt ein Paper ein oder aktualisiert es. Returns (vorlage_id, is_new) — is_new=False means it already existed. """ oparl_id = paper.get("id") if not oparl_id: return None, False # Check if already exists existing = conn.execute( "SELECT id FROM vorlagen WHERE oparl_id = ?", (oparl_id,) ).fetchone() reference = paper.get("reference") aktenzeichen, basis, suffix = parse_aktenzeichen(reference) oparl_type = paper.get("paperType") typ = map_paper_type(oparl_type) betreff = paper.get("name", "") datum = paper.get("date") web_url = paper.get("web") main_file = paper.get("mainFile") pdf_url = None if isinstance(main_file, dict): pdf_url = main_file.get("accessUrl") or main_file.get("downloadUrl") try: if existing: vorlage_id = existing["id"] conn.execute( """UPDATE vorlagen SET aktenzeichen = ?, aktenzeichen_basis = ?, aktenzeichen_suffix = ?, typ = ?, betreff = ?, datum_eingang = ?, pdf_url = ?, web_url = ?, ist_verwaltungsvorlage = ?, scraped_at = CURRENT_TIMESTAMP WHERE id = ?""", (aktenzeichen, basis, suffix, typ, betreff, datum, pdf_url, web_url, is_verwaltungsvorlage(oparl_type), vorlage_id), ) return vorlage_id, False else: cur = conn.execute( """INSERT INTO vorlagen (oparl_id, aktenzeichen, aktenzeichen_basis, aktenzeichen_suffix, typ, betreff, datum_eingang, pdf_url, web_url, ist_verwaltungsvorlage) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (oparl_id, aktenzeichen, basis, suffix, typ, betreff, datum, pdf_url, web_url, is_verwaltungsvorlage(oparl_type)), ) return cur.lastrowid, True except sqlite3.Error as e: print(f" DB-Fehler bei {reference}: {e}") return None, False def upsert_consultations(conn: sqlite3.Connection, vorlage_id: int, paper: dict): """Speichert die Beratungsfolge mit Dedup über oparl_id.""" consultations = paper.get("consultation") or [] for cons in consultations: if not isinstance(cons, dict): continue cons_oparl_id = cons.get("id") # Gremium auflösen orgs = cons.get("organization") or [] gremium_id = None for org_url in orgs: if not isinstance(org_url, str): continue # Nur typ=gr sind echte Gremien if "typ=gr" not in org_url: continue conn.execute( "INSERT OR IGNORE INTO gremien (oparl_id, name) VALUES (?, ?)", (org_url, _org_placeholder_name(org_url)), ) row = conn.execute( "SELECT id FROM gremien WHERE oparl_id = ?", (org_url,) ).fetchone() if row: gremium_id = row["id"] rolle = cons.get("role") authoritative = cons.get("authoritative", False) if cons_oparl_id: conn.execute( """INSERT INTO beratungen (oparl_id, vorlage_id, gremium_id, rolle) VALUES (?, ?, ?, ?) ON CONFLICT(oparl_id) DO UPDATE SET vorlage_id = excluded.vorlage_id, gremium_id = excluded.gremium_id, rolle = excluded.rolle""", (cons_oparl_id, vorlage_id, gremium_id, rolle), ) else: # Fallback ohne oparl_id: prüfe auf Duplikat exists = conn.execute( """SELECT 1 FROM beratungen WHERE vorlage_id = ? AND gremium_id IS ? AND rolle IS ? LIMIT 1""", (vorlage_id, gremium_id, rolle), ).fetchone() if not exists: conn.execute( "INSERT INTO beratungen (vorlage_id, gremium_id, rolle) VALUES (?, ?, ?)", (vorlage_id, gremium_id, rolle), ) def insert_files(conn: sqlite3.Connection, vorlage_id: int, paper: dict): """Speichert Anlagen eines Papers (nur neue).""" aux_files = paper.get("auxiliaryFile") or [] for f in aux_files: if not isinstance(f, dict): continue url = f.get("accessUrl") or f.get("downloadUrl") name = f.get("name") or f.get("fileName", "") if url: exists = conn.execute( "SELECT 1 FROM anlagen WHERE vorlage_id = ? AND url = ?", (vorlage_id, url), ).fetchone() if not exists: conn.execute( "INSERT INTO anlagen (vorlage_id, dateiname, url) VALUES (?, ?, ?)", (vorlage_id, name, url), ) def _org_placeholder_name(org_url: str) -> str: """Extrahiere Platzhalter-Name aus URL.""" org_id = org_url.split("id=")[-1] if "id=" in org_url else org_url return f"Gremium {org_id}" def build_suffix_references(conn: sqlite3.Connection): """Erstellt automatische Suffix-Referenzen.""" print("\nErstelle Suffix-Referenzen...") # Parent → Child conn.execute( """INSERT OR IGNORE INTO referenzen (quelle_id, ziel_id, typ, konfidenz) SELECT parent.id, child.id, 'suffix', 1.0 FROM vorlagen child JOIN vorlagen parent ON child.aktenzeichen_basis = parent.aktenzeichen_basis WHERE child.aktenzeichen_suffix IS NOT NULL AND parent.aktenzeichen_suffix IS NULL AND child.id != parent.id""" ) # Sequential: -1 → -2, -2 → -3, etc. conn.execute( """INSERT OR IGNORE INTO referenzen (quelle_id, ziel_id, typ, konfidenz) SELECT earlier.id, later.id, 'suffix', 1.0 FROM vorlagen later JOIN vorlagen earlier ON later.aktenzeichen_basis = earlier.aktenzeichen_basis AND later.aktenzeichen_suffix IS NOT NULL AND earlier.aktenzeichen_suffix IS NOT NULL AND CAST(REPLACE(later.aktenzeichen_suffix, '-', '') AS INTEGER) = CAST(REPLACE(earlier.aktenzeichen_suffix, '-', '') AS INTEGER) + 1 WHERE later.id != earlier.id""" ) count = conn.execute("SELECT changes()").fetchone()[0] conn.commit() print(f" Suffix-Referenzen aktualisiert (letzte Runde: {count} neue)") def resolve_gremien(conn: sqlite3.Connection, client: httpx.Client): """Löst Gremien-Namen über die OParl Organizations-API auf. Aktualisiert name, kuerzel und typ für alle Gremien mit Platzhalter-Namen. """ print("\nLöse Gremien-Namen auf...") # Alle Organisationen von der API holen org_lookup: dict[str, dict] = {} page = 1 while True: data = fetch_page(client, ORGS_URL, {"body": 1, "page": page}) if not data or "data" not in data: break for org in data["data"]: oparl_id = org.get("id") if oparl_id: classification = org.get("classification", "") typ = GREMIUM_TYP_MAP.get(classification, "sonstig") org_lookup[oparl_id] = { "name": org.get("name", ""), "kuerzel": org.get("shortName", ""), "typ": typ, "classification": classification, } total_pages = data.get("pagination", {}).get("totalPages", page) if page >= total_pages: break page += 1 time.sleep(0.2) print(f" {len(org_lookup)} Organisationen von API geladen") # Gremien in DB aktualisieren updated = 0 for row in conn.execute("SELECT id, oparl_id, name FROM gremien").fetchall(): oparl_id = row["oparl_id"] if oparl_id in org_lookup: org = org_lookup[oparl_id] if org["name"] and org["name"] != row["name"]: conn.execute( "UPDATE gremien SET name = ?, kuerzel = ?, typ = ? WHERE id = ?", (org["name"], org["kuerzel"] or None, org["typ"], row["id"]), ) updated += 1 conn.commit() print(f" {updated} Gremien aktualisiert") def import_papers(conn: sqlite3.Connection, client: httpx.Client, start_page: int = 1, limit: int = 0, incremental: bool = False): """Importiert Papers von der OParl-API. Args: start_page: Erste Seite (für --resume) limit: Max. Anzahl Papers (0 = alle) incremental: Stoppt wenn nur bereits bekannte Papers gefunden werden """ # Pagination ermitteln print("Ermittle Seitenanzahl...") first = fetch_page(client, PAPERS_URL, {"body": 1, "page": 1}) if not first or "pagination" not in first: print("FEHLER: Konnte API nicht erreichen") return total_pages = first["pagination"]["totalPages"] total_elements = first["pagination"]["totalElements"] print(f" {total_elements} Papers auf {total_pages} Seiten") existing_count = conn.execute("SELECT COUNT(*) FROM vorlagen").fetchone()[0] print(f" {existing_count} bereits in DB\n") total_new = 0 total_updated = 0 consecutive_known_pages = 0 # Für --incremental Abbruch for page_num in range(start_page, total_pages + 1): if page_num == 1 and start_page == 1: data = first else: data = fetch_page(client, PAPERS_URL, {"body": 1, "page": page_num}) if not data or "data" not in data: print(f" Seite {page_num} übersprungen (kein Data)") continue papers = data["data"] page_new = 0 for paper in papers: vorlage_id, is_new = upsert_paper(conn, paper) if vorlage_id: upsert_consultations(conn, vorlage_id, paper) insert_files(conn, vorlage_id, paper) if is_new: page_new += 1 total_new += 1 else: total_updated += 1 conn.commit() progress = (page_num / total_pages) * 100 marker = f" (+{page_new} neu)" if page_new > 0 else " (bekannt)" print(f" Seite {page_num:4d}/{total_pages} ({progress:5.1f}%)" f" — {len(papers)} Papers{marker}" f" — neu: {total_new}, aktualisiert: {total_updated}") # Incremental: Abbruch wenn 3 Seiten hintereinander nur bekannte Papers if incremental: if page_new == 0: consecutive_known_pages += 1 if consecutive_known_pages >= 3: print(f"\n Inkrementell: 3 Seiten ohne neue Papers, stoppe.") break else: consecutive_known_pages = 0 if limit and (total_new + total_updated) >= limit: print(f"\n Limit von {limit} erreicht, stoppe.") break # Schonende Pause time.sleep(0.3) if page_num % 100 == 0: print(f" Checkpoint Seite {page_num} — Pause 5s...") time.sleep(5) return total_new, total_updated def print_stats(conn: sqlite3.Connection): """Gibt aktuelle DB-Statistiken aus.""" print(f"\n=== Datenbank-Statistiken ===") print(f" Vorlagen: {conn.execute('SELECT COUNT(*) FROM vorlagen').fetchone()[0]}") print(f" Beratungen: {conn.execute('SELECT COUNT(*) FROM beratungen').fetchone()[0]}") print(f" Gremien: {conn.execute('SELECT COUNT(*) FROM gremien').fetchone()[0]}") print(f" Referenzen: {conn.execute('SELECT COUNT(*) FROM referenzen').fetchone()[0]}") print(f" Anlagen: {conn.execute('SELECT COUNT(*) FROM anlagen').fetchone()[0]}") print(f"\n Vorlagen nach Typ:") for r in conn.execute("SELECT typ, COUNT(*) c FROM vorlagen GROUP BY typ ORDER BY c DESC"): print(f" {r['typ']:25s} {r['c']:>6d}") print(f"\n Zeitraum: {conn.execute('SELECT MIN(datum_eingang) FROM vorlagen').fetchone()[0]}" f" bis {conn.execute('SELECT MAX(datum_eingang) FROM vorlagen').fetchone()[0]}") def main(): parser = argparse.ArgumentParser(description="OParl-Import für Antragstracker Hagen") parser.add_argument("--resume", type=int, default=0, help="Ab dieser Seitennummer weitermachen") parser.add_argument("--incremental", action="store_true", help="Nur neue Papers (stoppt bei bekannten)") parser.add_argument("--limit", type=int, default=0, help="Max. Anzahl Papers (0 = alle)") parser.add_argument("--resolve-gremien", action="store_true", help="Nur Gremien-Namen auflösen, kein Paper-Import") parser.add_argument("--no-references", action="store_true", help="Suffix-Referenzen nicht neu bauen") args = parser.parse_args() print("=== Antragstracker Hagen — OParl-Import ===\n") conn = init_db() print(f" DB: {DB_PATH}\n") client = httpx.Client( headers={"Accept": "application/json"}, follow_redirects=True, ) try: if args.resolve_gremien: resolve_gremien(conn, client) print_stats(conn) return # Paper-Import start_page = args.resume if args.resume > 0 else 1 if args.resume: print(f"Setze Import ab Seite {start_page} fort...\n") elif args.incremental: print("Inkrementeller Import (nur neue Papers)...\n") else: print("Voll-Import...\n") import_papers(conn, client, start_page, args.limit, args.incremental) # Gremien-Namen auflösen resolve_gremien(conn, client) # Suffix-Referenzen if not args.no_references: build_suffix_references(conn) print_stats(conn) finally: client.close() conn.close() if __name__ == "__main__": main()