antragstracker/scripts/sync_oparl.py
Dotty Dotter c3e9f4b3e8 feat: Automatischer OParl-Sync (#3)
- scripts/sync_oparl.py: 5-Phasen-Sync (Import → Scrape → Ketten → Status → FTS5)
- Inkrementell: Nur neue Papers, stoppt nach 3 leeren Seiten
- Dry-Run-Modus (--dry-run)
- API: GET /api/sync/status + POST /api/sync/trigger
- Cron-fähig (Exit 0/1, stdout-Logging)
- Sync-State in data/sync_state.json
- 11 neue Vorlagen beim Dry-Run erkannt

Closes #3
2026-04-02 15:26:34 +02:00

618 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
OParl-Sync für den Antragstracker Hagen.
Führt einen inkrementellen Sync durch:
1. Neue Papers von der OParl-API importieren
2. Beratungsfolge für neue Vorlagen scrapen (ALLRIS)
3. Neue Vorlagen in Ketten einordnen (Suffix-Matching)
4. Status-Engine für betroffene Ketten laufen lassen
5. FTS5-Index aktualisieren
6. Zusammenfassung ausgeben
Nutzung:
python scripts/sync_oparl.py # Normaler Sync
python scripts/sync_oparl.py --dry-run # Nur zeigen was passieren würde
python scripts/sync_oparl.py --full # Vollständiger Re-Import
Cron-fähig: Exit 0 bei Erfolg, Logging nach stdout.
# Täglich 06:00: OParl-Sync
# 0 6 * * * cd /path/to/antragstracker && PYTHONPATH=src python3 scripts/sync_oparl.py >> data/sync.log 2>&1
"""
import argparse
import functools
import json
import sqlite3
import sys
import time
from datetime import datetime
from pathlib import Path
# Unbuffered print
print = functools.partial(print, flush=True)
# Projekt-Root
PROJECT_ROOT = Path(__file__).resolve().parent.parent
DB_PATH = PROJECT_ROOT / "data" / "tracker.db"
SYNC_STATE_PATH = PROJECT_ROOT / "data" / "sync_state.json"
# Füge src zum Path hinzu für tracker-Imports
sys.path.insert(0, str(PROJECT_ROOT / "backend" / "src"))
def get_db() -> sqlite3.Connection:
"""Öffne DB-Verbindung."""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode = WAL")
conn.execute("PRAGMA foreign_keys = ON")
return conn
def load_sync_state() -> dict:
"""Lade letzten Sync-State."""
if SYNC_STATE_PATH.exists():
try:
return json.loads(SYNC_STATE_PATH.read_text())
except Exception:
pass
return {}
def save_sync_state(state: dict):
"""Speichere Sync-State."""
SYNC_STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
SYNC_STATE_PATH.write_text(json.dumps(state, indent=2, ensure_ascii=False))
# ──────────────────────────────────────────────
# Phase 1: Inkrementeller OParl-Import
# ──────────────────────────────────────────────
def phase_import(conn: sqlite3.Connection, dry_run: bool = False, full: bool = False) -> list[int]:
"""Importiere neue Papers von der OParl-API.
Returns: Liste der neuen vorlage_ids.
"""
# Importiere die OParl-Parsing-Logik
sys.path.insert(0, str(PROJECT_ROOT / "scripts"))
import httpx
from import_oparl import (
PAPERS_URL,
fetch_page,
upsert_paper,
upsert_consultations,
insert_files,
build_suffix_references,
)
client = httpx.Client(
headers={"Accept": "application/json"},
follow_redirects=True,
)
try:
# Seitenanzahl ermitteln
first = fetch_page(client, PAPERS_URL, {"body": 1, "page": 1})
if not first or "pagination" not in first:
print(" FEHLER: Konnte OParl-API nicht erreichen")
return []
total_pages = first["pagination"]["totalPages"]
total_elements = first["pagination"]["totalElements"]
existing = conn.execute("SELECT COUNT(*) FROM vorlagen").fetchone()[0]
print(f" API: {total_elements} Papers auf {total_pages} Seiten, DB: {existing}")
if dry_run:
# Im Dry-Run: Nur erste Seite checken
new_ids = []
for paper in first.get("data", []):
oparl_id = paper.get("id")
if oparl_id:
exists = conn.execute(
"SELECT id FROM vorlagen WHERE oparl_id = ?", (oparl_id,)
).fetchone()
if not exists:
ref = paper.get("reference", "?")
name = paper.get("name", "?")[:60]
print(f" NEU: {ref}{name}")
new_ids.append(-1) # Placeholder
return new_ids
# Inkrementeller Import: von Seite 1, stoppe bei bekannten
new_ids = []
consecutive_known = 0
max_pages = total_pages if full else total_pages # Bei full alle Seiten
for page_num in range(1, max_pages + 1):
if page_num == 1:
data = first
else:
data = fetch_page(client, PAPERS_URL, {"body": 1, "page": page_num})
if not data or "data" not in data:
continue
page_new = 0
for paper in data["data"]:
vorlage_id, is_new = upsert_paper(conn, paper)
if vorlage_id:
upsert_consultations(conn, vorlage_id, paper)
insert_files(conn, vorlage_id, paper)
if is_new:
new_ids.append(vorlage_id)
page_new += 1
conn.commit()
if page_new > 0:
print(f" Seite {page_num}/{total_pages}: +{page_new} neue")
consecutive_known = 0
else:
consecutive_known += 1
# Inkrementell: nach 3 Seiten ohne neue aufhören
if not full and consecutive_known >= 3:
print(f" Stoppe nach {page_num} Seiten (3x ohne neue)")
break
time.sleep(0.3)
# Suffix-Referenzen aktualisieren
if new_ids:
build_suffix_references(conn)
return new_ids
finally:
client.close()
# ──────────────────────────────────────────────
# Phase 2: Beratungsfolge scrapen
# ──────────────────────────────────────────────
def phase_scrape(conn: sqlite3.Connection, new_ids: list[int], dry_run: bool = False) -> int:
"""Scrape Beratungsfolge für neue Vorlagen.
Returns: Anzahl gescrapte Vorlagen.
"""
if not new_ids:
return 0
from tracker.core.rescrape import _rescrape_vorlage_impl
scraped = 0
errors = 0
for vid in new_ids:
row = conn.execute(
"SELECT aktenzeichen, web_url FROM vorlagen WHERE id = ?", (vid,)
).fetchone()
if not row or not row["web_url"]:
continue
if dry_run:
print(f" Würde scrapen: {row['aktenzeichen']}")
scraped += 1
continue
try:
result = _rescrape_vorlage_impl(conn, vid)
n_ber = result.get("updated_beratungen", 0)
has_vt = result.get("updated_volltext", False)
errs = result.get("errors", [])
if errs:
errors += 1
print(f" ⚠️ {row['aktenzeichen']}: {n_ber} Beratungen, Fehler: {errs[0][:80]}")
elif n_ber > 0 or has_vt:
print(f"{row['aktenzeichen']}: {n_ber} Beratungen" +
(", Volltext extrahiert" if has_vt else ""))
scraped += 1
except Exception as e:
errors += 1
print(f"{row['aktenzeichen']}: {e}")
# Rate-Limiting: 1s zwischen ALLRIS-Requests
time.sleep(1.0)
conn.commit()
if errors:
print(f" {errors} Fehler beim Scrapen")
return scraped
# ──────────────────────────────────────────────
# Phase 3: Ketten-Zuordnung (Suffix-Matching)
# ──────────────────────────────────────────────
def phase_chains(conn: sqlite3.Connection, new_ids: list[int], dry_run: bool = False) -> dict:
"""Ordne neue Vorlagen in bestehende Ketten ein.
Returns: {"created": N, "extended": N, "affected_chain_ids": [...]}
"""
result = {"created": 0, "extended": 0, "affected_chain_ids": set()}
if not new_ids:
return result
for vid in new_ids:
row = conn.execute(
"SELECT id, aktenzeichen, aktenzeichen_basis, aktenzeichen_suffix, typ, betreff, datum_eingang "
"FROM vorlagen WHERE id = ?", (vid,)
).fetchone()
if not row:
continue
# Bereits in einer Kette?
already = conn.execute(
"SELECT kette_id FROM ketten_glieder WHERE vorlage_id = ?", (vid,)
).fetchone()
if already:
result["affected_chain_ids"].add(already["kette_id"])
continue
basis_az = row["aktenzeichen_basis"]
suffix = row["aktenzeichen_suffix"]
if not basis_az:
continue
# Nur Suffix-Dokumente in Ketten einordnen
if suffix:
# Finde Basis-Dokument
basis = conn.execute(
"SELECT id, aktenzeichen, typ, betreff FROM vorlagen "
"WHERE aktenzeichen_basis = ? AND (aktenzeichen_suffix IS NULL OR aktenzeichen_suffix = '')",
(basis_az,)
).fetchone()
if not basis:
continue
# Ist die Basis schon in einer Kette?
existing_chain = conn.execute(
"SELECT kette_id FROM ketten_glieder WHERE vorlage_id = ?",
(basis["id"],)
).fetchone()
if existing_chain:
kette_id = existing_chain["kette_id"]
if dry_run:
print(f" Würde {row['aktenzeichen']} → Kette {kette_id} hinzufügen")
else:
max_pos = conn.execute(
"SELECT MAX(position) as mp FROM ketten_glieder WHERE kette_id = ?",
(kette_id,)
).fetchone()["mp"] or 0
conn.execute(
"INSERT OR IGNORE INTO ketten_glieder (kette_id, vorlage_id, position, rolle) "
"VALUES (?, ?, ?, ?)",
(kette_id, vid, max_pos + 1, _rolle(row["typ"]))
)
print(f" {row['aktenzeichen']} → Kette {kette_id}")
result["extended"] += 1
result["affected_chain_ids"].add(kette_id)
else:
# Neue Kette erstellen
typ = _chain_type(basis["typ"])
if typ not in ("antrag", "anfrage"):
continue
if dry_run:
print(f" Würde neue Kette: {basis['aktenzeichen']} + {row['aktenzeichen']}")
else:
conn.execute(
"INSERT INTO ketten (ursprung_id, typ, thema, status, begruendung) "
"VALUES (?, ?, ?, 'eingereicht', ?)",
(basis["id"], typ, basis["betreff"],
f"Automatisch erstellt via Sync: {basis['aktenzeichen']} + {row['aktenzeichen']}")
)
kette_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
conn.execute(
"INSERT INTO ketten_glieder (kette_id, vorlage_id, position, rolle) VALUES (?, ?, 0, 'ursprung')",
(kette_id, basis["id"])
)
conn.execute(
"INSERT INTO ketten_glieder (kette_id, vorlage_id, position, rolle) VALUES (?, ?, 1, ?)",
(kette_id, vid, _rolle(row["typ"]))
)
print(f" 🆕 Kette {kette_id}: {basis['aktenzeichen']} + {row['aktenzeichen']}")
result["affected_chain_ids"].add(kette_id)
result["created"] += 1
else:
# Basis-Dokument: Checke ob es Suffixe gibt die schon in Ketten sind
chain_row = conn.execute("""
SELECT kg.kette_id FROM ketten_glieder kg
JOIN vorlagen v ON kg.vorlage_id = v.id
WHERE v.aktenzeichen_basis = ? AND v.aktenzeichen_suffix IS NOT NULL
LIMIT 1
""", (basis_az,)).fetchone()
if chain_row:
kette_id = chain_row["kette_id"]
if dry_run:
print(f" Würde Basis {row['aktenzeichen']} → Kette {kette_id} (Pos. 0)")
else:
# Basis an Position 0 einfügen, Rest hochschieben
conn.execute(
"UPDATE ketten_glieder SET position = position + 1 WHERE kette_id = ?",
(kette_id,)
)
conn.execute(
"INSERT OR IGNORE INTO ketten_glieder (kette_id, vorlage_id, position, rolle) "
"VALUES (?, ?, 0, 'ursprung')",
(kette_id, vid)
)
conn.execute(
"UPDATE ketten SET ursprung_id = ? WHERE id = ?",
(vid, kette_id)
)
print(f" 📎 Basis {row['aktenzeichen']} → Kette {kette_id} (Pos. 0)")
result["extended"] += 1
result["affected_chain_ids"].add(kette_id)
if not dry_run:
conn.commit()
return result
def _rolle(typ):
if typ in ("stellungnahme", "bericht"):
return "antwort"
return "folge"
def _chain_type(typ):
if typ == "antrag":
return "antrag"
elif typ == "anfrage":
return "anfrage"
return "sonstig"
# ──────────────────────────────────────────────
# Phase 4: Status-Engine
# ──────────────────────────────────────────────
def phase_status(conn: sqlite3.Connection, chain_ids: set[int], dry_run: bool = False) -> int:
"""Aktualisiere Status für betroffene Ketten.
Returns: Anzahl aktualisierter Ketten.
"""
if not chain_ids:
return 0
from tracker.core.status import compute_status
updated = 0
for kette_id in chain_ids:
kette = conn.execute(
"SELECT id, ursprung_id, typ FROM ketten WHERE id = ?", (kette_id,)
).fetchone()
if not kette:
continue
members = conn.execute("""
SELECT v.id, v.aktenzeichen, v.aktenzeichen_suffix, v.typ,
v.datum_eingang, v.betreff
FROM ketten_glieder kg
JOIN vorlagen v ON kg.vorlage_id = v.id
WHERE kg.kette_id = ?
ORDER BY kg.position
""", (kette_id,)).fetchall()
if not members:
continue
status_info = compute_status(conn, kette["ursprung_id"], kette["typ"], members)
if dry_run:
print(f" Kette {kette_id}: Status → {status_info['status']}")
else:
dates = [m["datum_eingang"] for m in members if m["datum_eingang"]]
letzte_aktivitaet = max(dates) if dates else None
conn.execute("""
UPDATE ketten SET status = ?, status_seit = ?, letzte_aktivitaet = ?,
vertagungen_count = ?, begruendung = ?
WHERE id = ?
""", (
status_info["status"],
status_info.get("status_seit"),
letzte_aktivitaet,
status_info.get("vertagungen_count", 0),
status_info.get("begruendung"),
kette_id,
))
updated += 1
if not dry_run:
conn.commit()
return updated
# ──────────────────────────────────────────────
# Phase 5: FTS5-Index aktualisieren
# ──────────────────────────────────────────────
def phase_fts(conn: sqlite3.Connection, new_ids: list[int], dry_run: bool = False) -> int:
"""Aktualisiere FTS5-Index für neue Vorlagen.
Returns: Anzahl indexierter Vorlagen.
"""
if not new_ids:
return 0
# Prüfe ob FTS5-Tabelle existiert
exists = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='vorlagen_fts'"
).fetchone()
if not exists:
print(" ⚠️ vorlagen_fts existiert nicht — FTS-Update übersprungen")
return 0
if dry_run:
return len(new_ids)
count = 0
for vid in new_ids:
row = conn.execute("""
SELECT v.id, v.aktenzeichen, v.betreff, v.volltext_clean,
kb.begruendung as zusammenfassung
FROM vorlagen v
LEFT JOIN ki_bewertungen kb ON kb.vorlage_id = v.id AND kb.typ = 'zusammenfassung'
WHERE v.id = ?
""", (vid,)).fetchone()
if not row:
continue
# Lösche ggf. alten Eintrag, dann neu einfügen
try:
conn.execute("INSERT INTO vorlagen_fts(vorlagen_fts, rowid, aktenzeichen, betreff, volltext, zusammenfassung) "
"VALUES('delete', ?, ?, ?, ?, ?)",
(row["id"], row["aktenzeichen"] or '', row["betreff"] or '',
row["volltext_clean"] or '', row["zusammenfassung"] or ''))
except Exception:
pass # Eintrag existierte nicht
conn.execute(
"INSERT INTO vorlagen_fts(rowid, aktenzeichen, betreff, volltext, zusammenfassung) "
"VALUES (?, ?, ?, ?, ?)",
(row["id"], row["aktenzeichen"] or '', row["betreff"] or '',
row["volltext_clean"] or '', row["zusammenfassung"] or '')
)
count += 1
conn.commit()
return count
# ──────────────────────────────────────────────
# Main
# ──────────────────────────────────────────────
def sync(dry_run: bool = False, full: bool = False) -> dict:
"""Führe den kompletten Sync durch.
Returns: Zusammenfassung als dict.
"""
start = datetime.now()
print(f"{'' * 60}")
print(f" OParl-Sync — {start.strftime('%Y-%m-%d %H:%M:%S')}")
if dry_run:
print(" ⚡ DRY-RUN — keine Änderungen")
if full:
print(" 🔄 FULL — kompletter Re-Import")
print(f"{'' * 60}")
conn = get_db()
summary = {
"started_at": start.isoformat(),
"dry_run": dry_run,
"new_vorlagen": 0,
"scraped": 0,
"chains_created": 0,
"chains_extended": 0,
"chains_status_updated": 0,
"fts_indexed": 0,
"errors": [],
}
try:
# Phase 1: Import
print(f"\n📥 Phase 1: OParl-Import {'(dry-run)' if dry_run else ''}...")
new_ids = phase_import(conn, dry_run=dry_run, full=full)
summary["new_vorlagen"] = len(new_ids)
print(f"{len(new_ids)} neue Vorlagen")
if not new_ids:
print("\n✅ Keine neuen Vorlagen — Sync abgeschlossen.")
summary["finished_at"] = datetime.now().isoformat()
summary["duration_s"] = (datetime.now() - start).total_seconds()
save_sync_state(summary)
return summary
# Phase 2: Beratungsfolge scrapen
print(f"\n🔍 Phase 2: Beratungsfolge scrapen {'(dry-run)' if dry_run else ''}...")
scraped = phase_scrape(conn, new_ids, dry_run=dry_run)
summary["scraped"] = scraped
print(f"{scraped} Vorlagen gescrapt")
# Phase 3: Ketten-Zuordnung
print(f"\n🔗 Phase 3: Ketten-Zuordnung {'(dry-run)' if dry_run else ''}...")
chain_result = phase_chains(conn, new_ids, dry_run=dry_run)
summary["chains_created"] = chain_result["created"]
summary["chains_extended"] = chain_result["extended"]
affected = chain_result["affected_chain_ids"]
print(f"{chain_result['created']} neue Ketten, {chain_result['extended']} erweitert")
# Phase 4: Status-Engine
print(f"\n⚡ Phase 4: Status-Engine {'(dry-run)' if dry_run else ''}...")
status_updated = phase_status(conn, affected, dry_run=dry_run)
summary["chains_status_updated"] = status_updated
print(f"{status_updated} Ketten aktualisiert")
# Phase 5: FTS5-Index
print(f"\n📝 Phase 5: FTS5-Index {'(dry-run)' if dry_run else ''}...")
fts_count = phase_fts(conn, new_ids, dry_run=dry_run)
summary["fts_indexed"] = fts_count
print(f"{fts_count} Einträge indexiert")
except Exception as e:
summary["errors"].append(str(e))
print(f"\n❌ Fehler: {e}")
import traceback
traceback.print_exc()
finally:
conn.close()
# Zusammenfassung
end = datetime.now()
duration = (end - start).total_seconds()
summary["finished_at"] = end.isoformat()
summary["duration_s"] = duration
if not dry_run:
save_sync_state(summary)
print(f"\n{'' * 60}")
print(f" Zusammenfassung")
print(f"{'' * 60}")
print(f" Neue Vorlagen: {summary['new_vorlagen']}")
print(f" Gescrapt: {summary['scraped']}")
print(f" Ketten erstellt: {summary['chains_created']}")
print(f" Ketten erweitert: {summary['chains_extended']}")
print(f" Status aktualisiert:{summary['chains_status_updated']}")
print(f" FTS indexiert: {summary['fts_indexed']}")
print(f" Dauer: {duration:.1f}s")
if summary["errors"]:
print(f" Fehler: {len(summary['errors'])}")
print(f"{'' * 60}")
return summary
def main():
parser = argparse.ArgumentParser(description="OParl-Sync für Antragstracker Hagen")
parser.add_argument("--dry-run", action="store_true",
help="Zeige was passieren würde, ohne zu importieren")
parser.add_argument("--full", action="store_true",
help="Vollständiger Re-Import (nicht nur inkrementell)")
args = parser.parse_args()
summary = sync(dry_run=args.dry_run, full=args.full)
# Exit 0 = Erfolg (cron-fähig)
sys.exit(1 if summary.get("errors") else 0)
if __name__ == "__main__":
main()