"""Halbautomatische Wahlprogramm-Beschaffung (#138). Workflow: 1. ``check_missing_programmes(bl, fraktionen)`` liefert Lücken 2. ``suggest_candidates(bl, partei)`` schlägt URL-Kandidaten vor (aus wahlprogramm-links.yaml im selben Verzeichnis) 3. ``fetch_and_verify(url, dest_path, expected_sha)`` lädt, prüft SHA-256, speichert in app/static/referenzen/ — oder bricht bei SHA-Abweichung ab 4. Re-Indexing via ``reindex_embeddings`` muss danach manuell ausgelöst werden CLI: python -m app.wahlprogramm_fetch --check [--bl BL] python -m app.wahlprogramm_fetch --fetch BL PARTEI [--yes] """ from __future__ import annotations import hashlib import json import logging import urllib.request from pathlib import Path from typing import Optional import yaml logger = logging.getLogger(__name__) _LINKS_FILE = Path(__file__).parent / "wahlprogramm-links.yaml" _LOCK_FILE = Path(__file__).parent / "wahlprogramm-shas.lock.json" _REFERENZEN_DIR = Path(__file__).parent / "static" / "referenzen" # --------------------------------------------------------------------------- # SHA-Lock — schuetzt vor stillem PDF-Austausch unter gleicher URL. # Hintergrund: abgeordnetenwatch hat die CDU-BE-2023-Datei intern gegen den # 2026-Berlin-Plan ersetzt, ohne den Slug zu aendern. Nach dem ersten # erfolgreichen Download wird der SHA-256 hier gepinnt; spaetere fetches # vergleichen gegen den Lock und brechen bei Abweichung ab. # --------------------------------------------------------------------------- def _load_lock() -> dict[str, str]: if not _LOCK_FILE.exists(): return {} try: return json.loads(_LOCK_FILE.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError) as exc: logger.error("Lock-File %s ist kaputt: %s — leerer Lock genutzt", _LOCK_FILE, exc) return {} def _save_lock(lock: dict[str, str]) -> None: _LOCK_FILE.write_text( json.dumps(lock, indent=2, sort_keys=True, ensure_ascii=False) + "\n", encoding="utf-8", ) def _lock_key(dateiname: str) -> str: return dateiname # --------------------------------------------------------------------------- # YAML-Quelle laden # --------------------------------------------------------------------------- def _load_links() -> dict: """Lädt wahlprogramm-links.yaml. Gibt leeres Dict zurück, wenn Datei fehlt.""" if not _LINKS_FILE.exists(): logger.warning("wahlprogramm-links.yaml nicht gefunden: %s", _LINKS_FILE) return {} with _LINKS_FILE.open(encoding="utf-8") as fh: return yaml.safe_load(fh) or {} # --------------------------------------------------------------------------- # Öffentliche API # --------------------------------------------------------------------------- def suggest_candidates(bundesland: str, partei: str) -> list[dict]: """Gibt URL-Kandidaten aus wahlprogramm-links.yaml für BL+Partei zurück. Args: bundesland: Bundesland-Code (z.B. "NRW"). partei: Partei-Kürzel (z.B. "BSW"). Returns: Liste von Dicts mit mindestens ``url`` und ``titel``. Leer, wenn keine Einträge vorhanden. """ data = _load_links() bl_block = data.get(bundesland, {}) partei_block = bl_block.get(partei, []) if isinstance(partei_block, dict): partei_block = [partei_block] return list(partei_block) def sha256_of_file(path: Path) -> str: """Berechnet den SHA-256-Hash einer Datei als Hex-String.""" h = hashlib.sha256() with path.open("rb") as fh: for chunk in iter(lambda: fh.read(65536), b""): h.update(chunk) return h.hexdigest() def fetch_and_verify( url: str, dest_path: Path, expected_sha: Optional[str] = None, *, accept_new_sha: bool = False, ) -> dict: """Lädt eine Datei herunter und prüft den SHA-256-Hash gegen den Lock. SHA-Gate-Logik (Pferdetausch-Schutz): - Beim ersten erfolgreichen Download wird der SHA in ``wahlprogramm-shas.lock.json`` gepinnt. - Spätere fetches vergleichen gegen diesen gepinnten SHA. Abweichung → Abbruch, ausser ``accept_new_sha=True`` ist gesetzt (dann wird der Lock explizit aktualisiert). - ``expected_sha`` (z.B. aus YAML) ueberschreibt den Lock fuer diesen Call. Args: url: Download-URL der PDF-Datei. dest_path: Ziel-Pfad (typischerweise in app/static/referenzen/). expected_sha: Wenn angegeben, muss der Download-Hash übereinstimmen (haerter als der Lock-Vergleich). accept_new_sha: Wenn True, wird der Lock auf den neuen SHA aktualisiert statt bei Abweichung abzubrechen. NICHT default — Maintainer-Override. Returns: Dict mit den Schlüsseln: - ``ok`` (bool): True bei Erfolg. - ``sha256`` (str): SHA-256 der heruntergeladenen Datei. - ``prev_sha256`` (str|None): SHA-256 der bisherigen Datei, falls vorhanden. - ``locked_sha256`` (str|None): SHA aus dem Lock-File (vor diesem Call). - ``error`` (str|None): Fehlermeldung bei Misserfolg. - ``changed`` (bool): True, wenn sich die Datei geaendert hat. - ``lock_updated`` (bool): True, wenn der Lock-Eintrag neu/ersetzt wurde. """ prev_sha: Optional[str] = None if dest_path.exists(): prev_sha = sha256_of_file(dest_path) lock = _load_lock() lock_key = _lock_key(dest_path.name) locked_sha = lock.get(lock_key) tmp_path = dest_path.with_suffix(".tmp") try: logger.info("Lade %s → %s", url, tmp_path) _referenzen_dir = dest_path.parent _referenzen_dir.mkdir(parents=True, exist_ok=True) req = urllib.request.Request( url, headers={"User-Agent": "GWOeAntragspruefer/1.0 (+https://gwoe.toppyr.de)"}, ) with urllib.request.urlopen(req, timeout=60) as resp: tmp_path.write_bytes(resp.read()) new_sha = sha256_of_file(tmp_path) # SHA-Gate gegen expected_sha (haerter, aus YAML kuratiert) if expected_sha and new_sha != expected_sha: tmp_path.unlink(missing_ok=True) return { "ok": False, "sha256": new_sha, "prev_sha256": prev_sha, "locked_sha256": locked_sha, "changed": False, "lock_updated": False, "error": ( f"SHA-Pruefung gegen erwarteten Hash fehlgeschlagen: " f"erwartet {expected_sha[:12]}…, erhalten {new_sha[:12]}…" ), } # SHA-Gate gegen Lock-File (Pferdetausch-Schutz) if locked_sha and new_sha != locked_sha and not accept_new_sha: tmp_path.unlink(missing_ok=True) return { "ok": False, "sha256": new_sha, "prev_sha256": prev_sha, "locked_sha256": locked_sha, "changed": False, "lock_updated": False, "error": ( f"Lock-Pruefung fehlgeschlagen: gepinnt {locked_sha[:12]}…, " f"jetzt {new_sha[:12]}…. Pferdetausch-Verdacht — Inhalt manuell " f"pruefen, dann mit --accept-new-sha bestaetigen." ), } # SHA-Gate gegen bisherige Datei (no-op) if prev_sha and new_sha == prev_sha: tmp_path.unlink(missing_ok=True) lock_updated = False if locked_sha != new_sha: # Datei war schon korrekt, Lock fehlte — initialer Pin. lock[lock_key] = new_sha _save_lock(lock) lock_updated = True logger.info("Datei unveraendert (SHA %s…), kein Ueberschreiben.", new_sha[:12]) return { "ok": True, "sha256": new_sha, "prev_sha256": prev_sha, "locked_sha256": locked_sha, "changed": False, "lock_updated": lock_updated, "error": None, } tmp_path.rename(dest_path) # Lock aktualisieren — initialer Pin oder bewusstes Update via accept_new_sha lock[lock_key] = new_sha _save_lock(lock) logger.info("Gespeichert: %s (SHA %s…)", dest_path.name, new_sha[:12]) return { "ok": True, "sha256": new_sha, "prev_sha256": prev_sha, "locked_sha256": locked_sha, "changed": True, "lock_updated": True, "error": None, } except Exception as exc: tmp_path.unlink(missing_ok=True) logger.exception("Fehler beim Download von %s", url) return { "ok": False, "sha256": "", "prev_sha256": prev_sha, "locked_sha256": locked_sha, "changed": False, "lock_updated": False, "error": str(exc), } def get_missing_programmes(bundesland: Optional[str] = None) -> list[dict]: """Liefert alle BL/Partei-Kombinationen mit Kandidaten-URL, aber fehlender Datei. Args: bundesland: Wenn angegeben, nur dieses Bundesland prüfen. Returns: Liste von Dicts mit ``bl``, ``partei``, ``dateiname``, ``kandidaten``. """ from .wahlprogramme import WAHLPROGRAMME missing: list[dict] = [] data = _load_links() bl_keys = [bundesland] if bundesland else list(data.keys()) for bl in bl_keys: bl_block = data.get(bl, {}) for partei, kandidaten in bl_block.items(): if isinstance(kandidaten, dict): kandidaten = [kandidaten] wp_info = WAHLPROGRAMME.get(bl, {}).get(partei) if wp_info: dateiname = wp_info["file"] dest = _REFERENZEN_DIR / dateiname if dest.exists(): continue # Datei liegt bereits vor else: dateiname = None # noch nicht in WAHLPROGRAMME registriert missing.append({ "bl": bl, "partei": partei, "dateiname": dateiname, "kandidaten": kandidaten, }) return missing # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def _cli() -> None: import argparse import sys parser = argparse.ArgumentParser( description="Halbautomatische Wahlprogramm-Beschaffung (#138)", ) parser.add_argument("--check", action="store_true", help="Lücken auflisten") parser.add_argument("--bl", help="Bundesland-Filter für --check") parser.add_argument("--fetch", nargs=2, metavar=("BL", "PARTEI"), help="Wahlprogramm für BL/PARTEI herunterladen") parser.add_argument("--url", help="URL überschreiben (statt erster Kandidat aus YAML)") parser.add_argument("--yes", action="store_true", help="Nicht interaktiv bestätigen (gefährlich)") parser.add_argument("--accept-new-sha", action="store_true", help="Bei Lock-Mismatch: neuen SHA in den Lock uebernehmen (Pferdetausch-Override)") parser.add_argument("--pin-existing", action="store_true", help="Alle bereits vorhandenen PDFs in static/referenzen/ in den Lock pinnen " "(einmalig nach Einfuehrung des Lock-Files)") args = parser.parse_args() if args.pin_existing: from .wahlprogramme import WAHLPROGRAMME lock = _load_lock() added = 0 for bl, parteien in WAHLPROGRAMME.items(): for partei, info in parteien.items(): dateiname = info.get("file") if isinstance(info, dict) else None if not dateiname: continue pdf_path = _REFERENZEN_DIR / dateiname if not pdf_path.exists(): continue key = _lock_key(dateiname) if key in lock: continue lock[key] = sha256_of_file(pdf_path) added += 1 print(f" pinned {bl}/{partei}: {dateiname} → {lock[key][:12]}…") if added: _save_lock(lock) print(f"\n{added} neue Eintraege in {_LOCK_FILE.name}.") else: print("Keine neuen Eintraege — alle vorhandenen PDFs sind bereits gepinnt.") sys.exit(0) if args.check: missing = get_missing_programmes(args.bl) if not missing: print("Keine Lücken gefunden.") for entry in missing: cands = entry["kandidaten"] cand_str = cands[0]["url"] if cands else "(keine URL hinterlegt)" print( f" {entry['bl']:6} {entry['partei']:15} " f"{'(noch nicht registriert)' if not entry['dateiname'] else entry['dateiname']:35} " f"→ {cand_str}" ) sys.exit(0) if args.fetch: bl, partei = args.fetch candidates = suggest_candidates(bl, partei) if args.url: url = args.url elif candidates: url = candidates[0]["url"] print(f"Kandidat: {url}") else: print(f"Keine URL-Kandidaten für {bl}/{partei} in wahlprogramm-links.yaml.") sys.exit(1) from .wahlprogramme import WAHLPROGRAMME wp_info = WAHLPROGRAMME.get(bl, {}).get(partei) if not wp_info: print( f"WARNUNG: {bl}/{partei} ist noch nicht in wahlprogramme.py eingetragen.\n" "Die Datei wird heruntergeladen, muss aber manuell registriert werden." ) dateiname = f"{partei.lower()}-{bl.lower()}-neu.pdf" else: dateiname = wp_info["file"] dest = _REFERENZEN_DIR / dateiname if not args.yes: confirm = input(f"Download {url} → {dest}? [j/N] ").strip().lower() if confirm not in ("j", "ja", "y", "yes"): print("Abgebrochen.") sys.exit(0) result = fetch_and_verify(url, dest, accept_new_sha=args.accept_new_sha) if result["ok"]: change_note = "geaendert" if result["changed"] else "unveraendert" print(f"OK ({change_note}) — SHA-256: {result['sha256'][:16]}…") if result["lock_updated"]: print(f"Lock aktualisiert in {_LOCK_FILE.name}.") if result["changed"]: print("Hinweis: Embeddings muessen neu indexiert werden (python -m app.reindex_embeddings).") else: print(f"FEHLER: {result['error']}") sys.exit(1) sys.exit(0) parser.print_help() if __name__ == "__main__": _cli()