gwoe-antragspruefer/app/wahlprogramm_fetch.py

402 lines
15 KiB
Python
Raw Permalink Normal View History

"""Halbautomatische Wahlprogramm-Beschaffung (#138).
Workflow:
1. ``check_missing_programmes(bl, fraktionen)`` liefert Lücken
2. ``suggest_candidates(bl, partei)`` schlägt URL-Kandidaten vor
(aus wahlprogramm-links.yaml im selben Verzeichnis)
3. ``fetch_and_verify(url, dest_path, expected_sha)`` lädt, prüft SHA-256,
speichert in app/static/referenzen/ oder bricht bei SHA-Abweichung ab
4. Re-Indexing via ``reindex_embeddings`` muss danach manuell ausgelöst werden
CLI:
python -m app.wahlprogramm_fetch --check [--bl BL]
python -m app.wahlprogramm_fetch --fetch BL PARTEI [--yes]
"""
from __future__ import annotations
import hashlib
import json
import logging
import urllib.request
from pathlib import Path
from typing import Optional
import yaml
logger = logging.getLogger(__name__)
_LINKS_FILE = Path(__file__).parent / "wahlprogramm-links.yaml"
_LOCK_FILE = Path(__file__).parent / "wahlprogramm-shas.lock.json"
_REFERENZEN_DIR = Path(__file__).parent / "static" / "referenzen"
# ---------------------------------------------------------------------------
# SHA-Lock — schuetzt vor stillem PDF-Austausch unter gleicher URL.
# Hintergrund: abgeordnetenwatch hat die CDU-BE-2023-Datei intern gegen den
# 2026-Berlin-Plan ersetzt, ohne den Slug zu aendern. Nach dem ersten
# erfolgreichen Download wird der SHA-256 hier gepinnt; spaetere fetches
# vergleichen gegen den Lock und brechen bei Abweichung ab.
# ---------------------------------------------------------------------------
def _load_lock() -> dict[str, str]:
if not _LOCK_FILE.exists():
return {}
try:
return json.loads(_LOCK_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError) as exc:
logger.error("Lock-File %s ist kaputt: %s — leerer Lock genutzt", _LOCK_FILE, exc)
return {}
def _save_lock(lock: dict[str, str]) -> None:
_LOCK_FILE.write_text(
json.dumps(lock, indent=2, sort_keys=True, ensure_ascii=False) + "\n",
encoding="utf-8",
)
def _lock_key(dateiname: str) -> str:
return dateiname
# ---------------------------------------------------------------------------
# YAML-Quelle laden
# ---------------------------------------------------------------------------
def _load_links() -> dict:
"""Lädt wahlprogramm-links.yaml. Gibt leeres Dict zurück, wenn Datei fehlt."""
if not _LINKS_FILE.exists():
logger.warning("wahlprogramm-links.yaml nicht gefunden: %s", _LINKS_FILE)
return {}
with _LINKS_FILE.open(encoding="utf-8") as fh:
return yaml.safe_load(fh) or {}
# ---------------------------------------------------------------------------
# Öffentliche API
# ---------------------------------------------------------------------------
def suggest_candidates(bundesland: str, partei: str) -> list[dict]:
"""Gibt URL-Kandidaten aus wahlprogramm-links.yaml für BL+Partei zurück.
Args:
bundesland: Bundesland-Code (z.B. "NRW").
partei: Partei-Kürzel (z.B. "BSW").
Returns:
Liste von Dicts mit mindestens ``url`` und ``titel``. Leer, wenn
keine Einträge vorhanden.
"""
data = _load_links()
bl_block = data.get(bundesland, {})
partei_block = bl_block.get(partei, [])
if isinstance(partei_block, dict):
partei_block = [partei_block]
return list(partei_block)
def sha256_of_file(path: Path) -> str:
"""Berechnet den SHA-256-Hash einer Datei als Hex-String."""
h = hashlib.sha256()
with path.open("rb") as fh:
for chunk in iter(lambda: fh.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def fetch_and_verify(
url: str,
dest_path: Path,
expected_sha: Optional[str] = None,
*,
accept_new_sha: bool = False,
) -> dict:
"""Lädt eine Datei herunter und prüft den SHA-256-Hash gegen den Lock.
SHA-Gate-Logik (Pferdetausch-Schutz):
- Beim ersten erfolgreichen Download wird der SHA in
``wahlprogramm-shas.lock.json`` gepinnt.
- Spätere fetches vergleichen gegen diesen gepinnten SHA. Abweichung
Abbruch, ausser ``accept_new_sha=True`` ist gesetzt (dann wird der Lock
explizit aktualisiert).
- ``expected_sha`` (z.B. aus YAML) ueberschreibt den Lock fuer diesen Call.
Args:
url: Download-URL der PDF-Datei.
dest_path: Ziel-Pfad (typischerweise in app/static/referenzen/).
expected_sha: Wenn angegeben, muss der Download-Hash übereinstimmen
(haerter als der Lock-Vergleich).
accept_new_sha: Wenn True, wird der Lock auf den neuen SHA aktualisiert
statt bei Abweichung abzubrechen. NICHT default Maintainer-Override.
Returns:
Dict mit den Schlüsseln:
- ``ok`` (bool): True bei Erfolg.
- ``sha256`` (str): SHA-256 der heruntergeladenen Datei.
- ``prev_sha256`` (str|None): SHA-256 der bisherigen Datei, falls vorhanden.
- ``locked_sha256`` (str|None): SHA aus dem Lock-File (vor diesem Call).
- ``error`` (str|None): Fehlermeldung bei Misserfolg.
- ``changed`` (bool): True, wenn sich die Datei geaendert hat.
- ``lock_updated`` (bool): True, wenn der Lock-Eintrag neu/ersetzt wurde.
"""
prev_sha: Optional[str] = None
if dest_path.exists():
prev_sha = sha256_of_file(dest_path)
lock = _load_lock()
lock_key = _lock_key(dest_path.name)
locked_sha = lock.get(lock_key)
tmp_path = dest_path.with_suffix(".tmp")
try:
logger.info("Lade %s%s", url, tmp_path)
_referenzen_dir = dest_path.parent
_referenzen_dir.mkdir(parents=True, exist_ok=True)
req = urllib.request.Request(
url,
headers={"User-Agent": "GWOeAntragspruefer/1.0 (+https://gwoe.toppyr.de)"},
)
with urllib.request.urlopen(req, timeout=60) as resp:
tmp_path.write_bytes(resp.read())
new_sha = sha256_of_file(tmp_path)
# SHA-Gate gegen expected_sha (haerter, aus YAML kuratiert)
if expected_sha and new_sha != expected_sha:
tmp_path.unlink(missing_ok=True)
return {
"ok": False,
"sha256": new_sha,
"prev_sha256": prev_sha,
"locked_sha256": locked_sha,
"changed": False,
"lock_updated": False,
"error": (
f"SHA-Pruefung gegen erwarteten Hash fehlgeschlagen: "
f"erwartet {expected_sha[:12]}…, erhalten {new_sha[:12]}"
),
}
# SHA-Gate gegen Lock-File (Pferdetausch-Schutz)
if locked_sha and new_sha != locked_sha and not accept_new_sha:
tmp_path.unlink(missing_ok=True)
return {
"ok": False,
"sha256": new_sha,
"prev_sha256": prev_sha,
"locked_sha256": locked_sha,
"changed": False,
"lock_updated": False,
"error": (
f"Lock-Pruefung fehlgeschlagen: gepinnt {locked_sha[:12]}…, "
f"jetzt {new_sha[:12]}…. Pferdetausch-Verdacht — Inhalt manuell "
f"pruefen, dann mit --accept-new-sha bestaetigen."
),
}
# SHA-Gate gegen bisherige Datei (no-op)
if prev_sha and new_sha == prev_sha:
tmp_path.unlink(missing_ok=True)
lock_updated = False
if locked_sha != new_sha:
# Datei war schon korrekt, Lock fehlte — initialer Pin.
lock[lock_key] = new_sha
_save_lock(lock)
lock_updated = True
logger.info("Datei unveraendert (SHA %s…), kein Ueberschreiben.", new_sha[:12])
return {
"ok": True,
"sha256": new_sha,
"prev_sha256": prev_sha,
"locked_sha256": locked_sha,
"changed": False,
"lock_updated": lock_updated,
"error": None,
}
tmp_path.rename(dest_path)
# Lock aktualisieren — initialer Pin oder bewusstes Update via accept_new_sha
lock[lock_key] = new_sha
_save_lock(lock)
logger.info("Gespeichert: %s (SHA %s…)", dest_path.name, new_sha[:12])
return {
"ok": True,
"sha256": new_sha,
"prev_sha256": prev_sha,
"locked_sha256": locked_sha,
"changed": True,
"lock_updated": True,
"error": None,
}
except Exception as exc:
tmp_path.unlink(missing_ok=True)
logger.exception("Fehler beim Download von %s", url)
return {
"ok": False,
"sha256": "",
"prev_sha256": prev_sha,
"locked_sha256": locked_sha,
"changed": False,
"lock_updated": False,
"error": str(exc),
}
def get_missing_programmes(bundesland: Optional[str] = None) -> list[dict]:
"""Liefert alle BL/Partei-Kombinationen mit Kandidaten-URL, aber fehlender Datei.
Args:
bundesland: Wenn angegeben, nur dieses Bundesland prüfen.
Returns:
Liste von Dicts mit ``bl``, ``partei``, ``dateiname``, ``kandidaten``.
"""
from .wahlprogramme import WAHLPROGRAMME
missing: list[dict] = []
data = _load_links()
bl_keys = [bundesland] if bundesland else list(data.keys())
for bl in bl_keys:
bl_block = data.get(bl, {})
for partei, kandidaten in bl_block.items():
if isinstance(kandidaten, dict):
kandidaten = [kandidaten]
wp_info = WAHLPROGRAMME.get(bl, {}).get(partei)
if wp_info:
dateiname = wp_info["file"]
dest = _REFERENZEN_DIR / dateiname
if dest.exists():
continue # Datei liegt bereits vor
else:
dateiname = None # noch nicht in WAHLPROGRAMME registriert
missing.append({
"bl": bl,
"partei": partei,
"dateiname": dateiname,
"kandidaten": kandidaten,
})
return missing
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _cli() -> None:
import argparse
import sys
parser = argparse.ArgumentParser(
description="Halbautomatische Wahlprogramm-Beschaffung (#138)",
)
parser.add_argument("--check", action="store_true", help="Lücken auflisten")
parser.add_argument("--bl", help="Bundesland-Filter für --check")
parser.add_argument("--fetch", nargs=2, metavar=("BL", "PARTEI"),
help="Wahlprogramm für BL/PARTEI herunterladen")
parser.add_argument("--url", help="URL überschreiben (statt erster Kandidat aus YAML)")
parser.add_argument("--yes", action="store_true",
help="Nicht interaktiv bestätigen (gefährlich)")
parser.add_argument("--accept-new-sha", action="store_true",
help="Bei Lock-Mismatch: neuen SHA in den Lock uebernehmen (Pferdetausch-Override)")
parser.add_argument("--pin-existing", action="store_true",
help="Alle bereits vorhandenen PDFs in static/referenzen/ in den Lock pinnen "
"(einmalig nach Einfuehrung des Lock-Files)")
args = parser.parse_args()
if args.pin_existing:
from .wahlprogramme import WAHLPROGRAMME
lock = _load_lock()
added = 0
for bl, parteien in WAHLPROGRAMME.items():
for partei, info in parteien.items():
dateiname = info.get("file") if isinstance(info, dict) else None
if not dateiname:
continue
pdf_path = _REFERENZEN_DIR / dateiname
if not pdf_path.exists():
continue
key = _lock_key(dateiname)
if key in lock:
continue
lock[key] = sha256_of_file(pdf_path)
added += 1
print(f" pinned {bl}/{partei}: {dateiname}{lock[key][:12]}")
if added:
_save_lock(lock)
print(f"\n{added} neue Eintraege in {_LOCK_FILE.name}.")
else:
print("Keine neuen Eintraege — alle vorhandenen PDFs sind bereits gepinnt.")
sys.exit(0)
if args.check:
missing = get_missing_programmes(args.bl)
if not missing:
print("Keine Lücken gefunden.")
for entry in missing:
cands = entry["kandidaten"]
cand_str = cands[0]["url"] if cands else "(keine URL hinterlegt)"
print(
f" {entry['bl']:6} {entry['partei']:15} "
f"{'(noch nicht registriert)' if not entry['dateiname'] else entry['dateiname']:35} "
f"{cand_str}"
)
sys.exit(0)
if args.fetch:
bl, partei = args.fetch
candidates = suggest_candidates(bl, partei)
if args.url:
url = args.url
elif candidates:
url = candidates[0]["url"]
print(f"Kandidat: {url}")
else:
print(f"Keine URL-Kandidaten für {bl}/{partei} in wahlprogramm-links.yaml.")
sys.exit(1)
from .wahlprogramme import WAHLPROGRAMME
wp_info = WAHLPROGRAMME.get(bl, {}).get(partei)
if not wp_info:
print(
f"WARNUNG: {bl}/{partei} ist noch nicht in wahlprogramme.py eingetragen.\n"
"Die Datei wird heruntergeladen, muss aber manuell registriert werden."
)
dateiname = f"{partei.lower()}-{bl.lower()}-neu.pdf"
else:
dateiname = wp_info["file"]
dest = _REFERENZEN_DIR / dateiname
if not args.yes:
confirm = input(f"Download {url}{dest}? [j/N] ").strip().lower()
if confirm not in ("j", "ja", "y", "yes"):
print("Abgebrochen.")
sys.exit(0)
result = fetch_and_verify(url, dest, accept_new_sha=args.accept_new_sha)
if result["ok"]:
change_note = "geaendert" if result["changed"] else "unveraendert"
print(f"OK ({change_note}) — SHA-256: {result['sha256'][:16]}")
if result["lock_updated"]:
print(f"Lock aktualisiert in {_LOCK_FILE.name}.")
if result["changed"]:
print("Hinweis: Embeddings muessen neu indexiert werden (python -m app.reindex_embeddings).")
else:
print(f"FEHLER: {result['error']}")
sys.exit(1)
sys.exit(0)
parser.print_help()
if __name__ == "__main__":
_cli()