antragstracker/scripts/sync_oparl.py

618 lines
22 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
OParl-Sync für den Antragstracker Hagen.
Führt einen inkrementellen Sync durch:
1. Neue Papers von der OParl-API importieren
2. Beratungsfolge für neue Vorlagen scrapen (ALLRIS)
3. Neue Vorlagen in Ketten einordnen (Suffix-Matching)
4. Status-Engine für betroffene Ketten laufen lassen
5. FTS5-Index aktualisieren
6. Zusammenfassung ausgeben
Nutzung:
python scripts/sync_oparl.py # Normaler Sync
python scripts/sync_oparl.py --dry-run # Nur zeigen was passieren würde
python scripts/sync_oparl.py --full # Vollständiger Re-Import
Cron-fähig: Exit 0 bei Erfolg, Logging nach stdout.
# Täglich 06:00: OParl-Sync
# 0 6 * * * cd /path/to/antragstracker && PYTHONPATH=src python3 scripts/sync_oparl.py >> data/sync.log 2>&1
"""
import argparse
import functools
import json
import sqlite3
import sys
import time
from datetime import datetime
from pathlib import Path
# Unbuffered print
print = functools.partial(print, flush=True)
# Projekt-Root
PROJECT_ROOT = Path(__file__).resolve().parent.parent
DB_PATH = PROJECT_ROOT / "data" / "tracker.db"
SYNC_STATE_PATH = PROJECT_ROOT / "data" / "sync_state.json"
# Füge src zum Path hinzu für tracker-Imports
sys.path.insert(0, str(PROJECT_ROOT / "backend" / "src"))
def get_db() -> sqlite3.Connection:
"""Öffne DB-Verbindung."""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode = WAL")
conn.execute("PRAGMA foreign_keys = ON")
return conn
def load_sync_state() -> dict:
"""Lade letzten Sync-State."""
if SYNC_STATE_PATH.exists():
try:
return json.loads(SYNC_STATE_PATH.read_text())
except Exception:
pass
return {}
def save_sync_state(state: dict):
"""Speichere Sync-State."""
SYNC_STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
SYNC_STATE_PATH.write_text(json.dumps(state, indent=2, ensure_ascii=False))
# ──────────────────────────────────────────────
# Phase 1: Inkrementeller OParl-Import
# ──────────────────────────────────────────────
def phase_import(conn: sqlite3.Connection, dry_run: bool = False, full: bool = False) -> list[int]:
"""Importiere neue Papers von der OParl-API.
Returns: Liste der neuen vorlage_ids.
"""
# Importiere die OParl-Parsing-Logik
sys.path.insert(0, str(PROJECT_ROOT / "scripts"))
import httpx
from import_oparl import (
PAPERS_URL,
fetch_page,
upsert_paper,
upsert_consultations,
insert_files,
build_suffix_references,
)
client = httpx.Client(
headers={"Accept": "application/json"},
follow_redirects=True,
)
try:
# Seitenanzahl ermitteln
first = fetch_page(client, PAPERS_URL, {"body": 1, "page": 1})
if not first or "pagination" not in first:
print(" FEHLER: Konnte OParl-API nicht erreichen")
return []
total_pages = first["pagination"]["totalPages"]
total_elements = first["pagination"]["totalElements"]
existing = conn.execute("SELECT COUNT(*) FROM vorlagen").fetchone()[0]
print(f" API: {total_elements} Papers auf {total_pages} Seiten, DB: {existing}")
if dry_run:
# Im Dry-Run: Nur erste Seite checken
new_ids = []
for paper in first.get("data", []):
oparl_id = paper.get("id")
if oparl_id:
exists = conn.execute(
"SELECT id FROM vorlagen WHERE oparl_id = ?", (oparl_id,)
).fetchone()
if not exists:
ref = paper.get("reference", "?")
name = paper.get("name", "?")[:60]
print(f" NEU: {ref}{name}")
new_ids.append(-1) # Placeholder
return new_ids
# Inkrementeller Import: von Seite 1, stoppe bei bekannten
new_ids = []
consecutive_known = 0
max_pages = total_pages if full else total_pages # Bei full alle Seiten
for page_num in range(1, max_pages + 1):
if page_num == 1:
data = first
else:
data = fetch_page(client, PAPERS_URL, {"body": 1, "page": page_num})
if not data or "data" not in data:
continue
page_new = 0
for paper in data["data"]:
vorlage_id, is_new = upsert_paper(conn, paper)
if vorlage_id:
upsert_consultations(conn, vorlage_id, paper)
insert_files(conn, vorlage_id, paper)
if is_new:
new_ids.append(vorlage_id)
page_new += 1
conn.commit()
if page_new > 0:
print(f" Seite {page_num}/{total_pages}: +{page_new} neue")
consecutive_known = 0
else:
consecutive_known += 1
# Inkrementell: nach 3 Seiten ohne neue aufhören
if not full and consecutive_known >= 3:
print(f" Stoppe nach {page_num} Seiten (3x ohne neue)")
break
time.sleep(0.3)
# Suffix-Referenzen aktualisieren
if new_ids:
build_suffix_references(conn)
return new_ids
finally:
client.close()
# ──────────────────────────────────────────────
# Phase 2: Beratungsfolge scrapen
# ──────────────────────────────────────────────
def phase_scrape(conn: sqlite3.Connection, new_ids: list[int], dry_run: bool = False) -> int:
"""Scrape Beratungsfolge für neue Vorlagen.
Returns: Anzahl gescrapte Vorlagen.
"""
if not new_ids:
return 0
from tracker.core.rescrape import _rescrape_vorlage_impl
scraped = 0
errors = 0
for vid in new_ids:
row = conn.execute(
"SELECT aktenzeichen, web_url FROM vorlagen WHERE id = ?", (vid,)
).fetchone()
if not row or not row["web_url"]:
continue
if dry_run:
print(f" Würde scrapen: {row['aktenzeichen']}")
scraped += 1
continue
try:
result = _rescrape_vorlage_impl(conn, vid)
n_ber = result.get("updated_beratungen", 0)
has_vt = result.get("updated_volltext", False)
errs = result.get("errors", [])
if errs:
errors += 1
print(f" ⚠️ {row['aktenzeichen']}: {n_ber} Beratungen, Fehler: {errs[0][:80]}")
elif n_ber > 0 or has_vt:
print(f"{row['aktenzeichen']}: {n_ber} Beratungen" +
(", Volltext extrahiert" if has_vt else ""))
scraped += 1
except Exception as e:
errors += 1
print(f"{row['aktenzeichen']}: {e}")
# Rate-Limiting: 1s zwischen ALLRIS-Requests
time.sleep(1.0)
conn.commit()
if errors:
print(f" {errors} Fehler beim Scrapen")
return scraped
# ──────────────────────────────────────────────
# Phase 3: Ketten-Zuordnung (Suffix-Matching)
# ──────────────────────────────────────────────
def phase_chains(conn: sqlite3.Connection, new_ids: list[int], dry_run: bool = False) -> dict:
"""Ordne neue Vorlagen in bestehende Ketten ein.
Returns: {"created": N, "extended": N, "affected_chain_ids": [...]}
"""
result = {"created": 0, "extended": 0, "affected_chain_ids": set()}
if not new_ids:
return result
for vid in new_ids:
row = conn.execute(
"SELECT id, aktenzeichen, aktenzeichen_basis, aktenzeichen_suffix, typ, betreff, datum_eingang "
"FROM vorlagen WHERE id = ?", (vid,)
).fetchone()
if not row:
continue
# Bereits in einer Kette?
already = conn.execute(
"SELECT kette_id FROM ketten_glieder WHERE vorlage_id = ?", (vid,)
).fetchone()
if already:
result["affected_chain_ids"].add(already["kette_id"])
continue
basis_az = row["aktenzeichen_basis"]
suffix = row["aktenzeichen_suffix"]
if not basis_az:
continue
# Nur Suffix-Dokumente in Ketten einordnen
if suffix:
# Finde Basis-Dokument
basis = conn.execute(
"SELECT id, aktenzeichen, typ, betreff FROM vorlagen "
"WHERE aktenzeichen_basis = ? AND (aktenzeichen_suffix IS NULL OR aktenzeichen_suffix = '')",
(basis_az,)
).fetchone()
if not basis:
continue
# Ist die Basis schon in einer Kette?
existing_chain = conn.execute(
"SELECT kette_id FROM ketten_glieder WHERE vorlage_id = ?",
(basis["id"],)
).fetchone()
if existing_chain:
kette_id = existing_chain["kette_id"]
if dry_run:
print(f" Würde {row['aktenzeichen']} → Kette {kette_id} hinzufügen")
else:
max_pos = conn.execute(
"SELECT MAX(position) as mp FROM ketten_glieder WHERE kette_id = ?",
(kette_id,)
).fetchone()["mp"] or 0
conn.execute(
"INSERT OR IGNORE INTO ketten_glieder (kette_id, vorlage_id, position, rolle) "
"VALUES (?, ?, ?, ?)",
(kette_id, vid, max_pos + 1, _rolle(row["typ"]))
)
print(f" {row['aktenzeichen']} → Kette {kette_id}")
result["extended"] += 1
result["affected_chain_ids"].add(kette_id)
else:
# Neue Kette erstellen
typ = _chain_type(basis["typ"])
if typ not in ("antrag", "anfrage"):
continue
if dry_run:
print(f" Würde neue Kette: {basis['aktenzeichen']} + {row['aktenzeichen']}")
else:
conn.execute(
"INSERT INTO ketten (ursprung_id, typ, thema, status, begruendung) "
"VALUES (?, ?, ?, 'eingereicht', ?)",
(basis["id"], typ, basis["betreff"],
f"Automatisch erstellt via Sync: {basis['aktenzeichen']} + {row['aktenzeichen']}")
)
kette_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
conn.execute(
"INSERT INTO ketten_glieder (kette_id, vorlage_id, position, rolle) VALUES (?, ?, 0, 'ursprung')",
(kette_id, basis["id"])
)
conn.execute(
"INSERT INTO ketten_glieder (kette_id, vorlage_id, position, rolle) VALUES (?, ?, 1, ?)",
(kette_id, vid, _rolle(row["typ"]))
)
print(f" 🆕 Kette {kette_id}: {basis['aktenzeichen']} + {row['aktenzeichen']}")
result["affected_chain_ids"].add(kette_id)
result["created"] += 1
else:
# Basis-Dokument: Checke ob es Suffixe gibt die schon in Ketten sind
chain_row = conn.execute("""
SELECT kg.kette_id FROM ketten_glieder kg
JOIN vorlagen v ON kg.vorlage_id = v.id
WHERE v.aktenzeichen_basis = ? AND v.aktenzeichen_suffix IS NOT NULL
LIMIT 1
""", (basis_az,)).fetchone()
if chain_row:
kette_id = chain_row["kette_id"]
if dry_run:
print(f" Würde Basis {row['aktenzeichen']} → Kette {kette_id} (Pos. 0)")
else:
# Basis an Position 0 einfügen, Rest hochschieben
conn.execute(
"UPDATE ketten_glieder SET position = position + 1 WHERE kette_id = ?",
(kette_id,)
)
conn.execute(
"INSERT OR IGNORE INTO ketten_glieder (kette_id, vorlage_id, position, rolle) "
"VALUES (?, ?, 0, 'ursprung')",
(kette_id, vid)
)
conn.execute(
"UPDATE ketten SET ursprung_id = ? WHERE id = ?",
(vid, kette_id)
)
print(f" 📎 Basis {row['aktenzeichen']} → Kette {kette_id} (Pos. 0)")
result["extended"] += 1
result["affected_chain_ids"].add(kette_id)
if not dry_run:
conn.commit()
return result
def _rolle(typ):
if typ in ("stellungnahme", "bericht"):
return "antwort"
return "folge"
def _chain_type(typ):
if typ == "antrag":
return "antrag"
elif typ == "anfrage":
return "anfrage"
return "sonstig"
# ──────────────────────────────────────────────
# Phase 4: Status-Engine
# ──────────────────────────────────────────────
def phase_status(conn: sqlite3.Connection, chain_ids: set[int], dry_run: bool = False) -> int:
"""Aktualisiere Status für betroffene Ketten.
Returns: Anzahl aktualisierter Ketten.
"""
if not chain_ids:
return 0
from tracker.core.status import compute_status
updated = 0
for kette_id in chain_ids:
kette = conn.execute(
"SELECT id, ursprung_id, typ FROM ketten WHERE id = ?", (kette_id,)
).fetchone()
if not kette:
continue
members = conn.execute("""
SELECT v.id, v.aktenzeichen, v.aktenzeichen_suffix, v.typ,
v.datum_eingang, v.betreff
FROM ketten_glieder kg
JOIN vorlagen v ON kg.vorlage_id = v.id
WHERE kg.kette_id = ?
ORDER BY kg.position
""", (kette_id,)).fetchall()
if not members:
continue
status_info = compute_status(conn, kette["ursprung_id"], kette["typ"], members)
if dry_run:
print(f" Kette {kette_id}: Status → {status_info['status']}")
else:
dates = [m["datum_eingang"] for m in members if m["datum_eingang"]]
letzte_aktivitaet = max(dates) if dates else None
conn.execute("""
UPDATE ketten SET status = ?, status_seit = ?, letzte_aktivitaet = ?,
vertagungen_count = ?, begruendung = ?
WHERE id = ?
""", (
status_info["status"],
status_info.get("status_seit"),
letzte_aktivitaet,
status_info.get("vertagungen_count", 0),
status_info.get("begruendung"),
kette_id,
))
updated += 1
if not dry_run:
conn.commit()
return updated
# ──────────────────────────────────────────────
# Phase 5: FTS5-Index aktualisieren
# ──────────────────────────────────────────────
def phase_fts(conn: sqlite3.Connection, new_ids: list[int], dry_run: bool = False) -> int:
"""Aktualisiere FTS5-Index für neue Vorlagen.
Returns: Anzahl indexierter Vorlagen.
"""
if not new_ids:
return 0
# Prüfe ob FTS5-Tabelle existiert
exists = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='vorlagen_fts'"
).fetchone()
if not exists:
print(" ⚠️ vorlagen_fts existiert nicht — FTS-Update übersprungen")
return 0
if dry_run:
return len(new_ids)
count = 0
for vid in new_ids:
row = conn.execute("""
SELECT v.id, v.aktenzeichen, v.betreff, v.volltext_clean,
kb.begruendung as zusammenfassung
FROM vorlagen v
LEFT JOIN ki_bewertungen kb ON kb.vorlage_id = v.id AND kb.typ = 'zusammenfassung'
WHERE v.id = ?
""", (vid,)).fetchone()
if not row:
continue
# Lösche ggf. alten Eintrag, dann neu einfügen
try:
conn.execute("INSERT INTO vorlagen_fts(vorlagen_fts, rowid, aktenzeichen, betreff, volltext, zusammenfassung) "
"VALUES('delete', ?, ?, ?, ?, ?)",
(row["id"], row["aktenzeichen"] or '', row["betreff"] or '',
row["volltext_clean"] or '', row["zusammenfassung"] or ''))
except Exception:
pass # Eintrag existierte nicht
conn.execute(
"INSERT INTO vorlagen_fts(rowid, aktenzeichen, betreff, volltext, zusammenfassung) "
"VALUES (?, ?, ?, ?, ?)",
(row["id"], row["aktenzeichen"] or '', row["betreff"] or '',
row["volltext_clean"] or '', row["zusammenfassung"] or '')
)
count += 1
conn.commit()
return count
# ──────────────────────────────────────────────
# Main
# ──────────────────────────────────────────────
def sync(dry_run: bool = False, full: bool = False) -> dict:
"""Führe den kompletten Sync durch.
Returns: Zusammenfassung als dict.
"""
start = datetime.now()
print(f"{'' * 60}")
print(f" OParl-Sync — {start.strftime('%Y-%m-%d %H:%M:%S')}")
if dry_run:
print(" ⚡ DRY-RUN — keine Änderungen")
if full:
print(" 🔄 FULL — kompletter Re-Import")
print(f"{'' * 60}")
conn = get_db()
summary = {
"started_at": start.isoformat(),
"dry_run": dry_run,
"new_vorlagen": 0,
"scraped": 0,
"chains_created": 0,
"chains_extended": 0,
"chains_status_updated": 0,
"fts_indexed": 0,
"errors": [],
}
try:
# Phase 1: Import
print(f"\n📥 Phase 1: OParl-Import {'(dry-run)' if dry_run else ''}...")
new_ids = phase_import(conn, dry_run=dry_run, full=full)
summary["new_vorlagen"] = len(new_ids)
print(f"{len(new_ids)} neue Vorlagen")
if not new_ids:
print("\n✅ Keine neuen Vorlagen — Sync abgeschlossen.")
summary["finished_at"] = datetime.now().isoformat()
summary["duration_s"] = (datetime.now() - start).total_seconds()
save_sync_state(summary)
return summary
# Phase 2: Beratungsfolge scrapen
print(f"\n🔍 Phase 2: Beratungsfolge scrapen {'(dry-run)' if dry_run else ''}...")
scraped = phase_scrape(conn, new_ids, dry_run=dry_run)
summary["scraped"] = scraped
print(f"{scraped} Vorlagen gescrapt")
# Phase 3: Ketten-Zuordnung
print(f"\n🔗 Phase 3: Ketten-Zuordnung {'(dry-run)' if dry_run else ''}...")
chain_result = phase_chains(conn, new_ids, dry_run=dry_run)
summary["chains_created"] = chain_result["created"]
summary["chains_extended"] = chain_result["extended"]
affected = chain_result["affected_chain_ids"]
print(f"{chain_result['created']} neue Ketten, {chain_result['extended']} erweitert")
# Phase 4: Status-Engine
print(f"\n⚡ Phase 4: Status-Engine {'(dry-run)' if dry_run else ''}...")
status_updated = phase_status(conn, affected, dry_run=dry_run)
summary["chains_status_updated"] = status_updated
print(f"{status_updated} Ketten aktualisiert")
# Phase 5: FTS5-Index
print(f"\n📝 Phase 5: FTS5-Index {'(dry-run)' if dry_run else ''}...")
fts_count = phase_fts(conn, new_ids, dry_run=dry_run)
summary["fts_indexed"] = fts_count
print(f"{fts_count} Einträge indexiert")
except Exception as e:
summary["errors"].append(str(e))
print(f"\n❌ Fehler: {e}")
import traceback
traceback.print_exc()
finally:
conn.close()
# Zusammenfassung
end = datetime.now()
duration = (end - start).total_seconds()
summary["finished_at"] = end.isoformat()
summary["duration_s"] = duration
if not dry_run:
save_sync_state(summary)
print(f"\n{'' * 60}")
print(f" Zusammenfassung")
print(f"{'' * 60}")
print(f" Neue Vorlagen: {summary['new_vorlagen']}")
print(f" Gescrapt: {summary['scraped']}")
print(f" Ketten erstellt: {summary['chains_created']}")
print(f" Ketten erweitert: {summary['chains_extended']}")
print(f" Status aktualisiert:{summary['chains_status_updated']}")
print(f" FTS indexiert: {summary['fts_indexed']}")
print(f" Dauer: {duration:.1f}s")
if summary["errors"]:
print(f" Fehler: {len(summary['errors'])}")
print(f"{'' * 60}")
return summary
def main():
parser = argparse.ArgumentParser(description="OParl-Sync für Antragstracker Hagen")
parser.add_argument("--dry-run", action="store_true",
help="Zeige was passieren würde, ohne zu importieren")
parser.add_argument("--full", action="store_true",
help="Vollständiger Re-Import (nicht nur inkrementell)")
args = parser.parse_args()
summary = sync(dry_run=args.dry_run, full=args.full)
# Exit 0 = Erfolg (cron-fähig)
sys.exit(1 if summary.get("errors") else 0)
if __name__ == "__main__":
main()