gwoe-antragspruefer/tests/test_wahlprogramm_fetch.py

419 lines
17 KiB
Python
Raw Permalink Normal View History

"""Tests für wahlprogramm_fetch.py (#138) — SHA-Gate und Kandidaten-Suche."""
from __future__ import annotations
import hashlib
import sys
import types
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Stub yaml, damit der Import ohne PyPI-Paket läuft
# ---------------------------------------------------------------------------
if "yaml" not in sys.modules:
_yaml_mod = types.ModuleType("yaml")
def _safe_load(fh):
return {}
_yaml_mod.safe_load = _safe_load
sys.modules["yaml"] = _yaml_mod
from app.wahlprogramm_fetch import (
fetch_and_verify,
sha256_of_file,
suggest_candidates,
)
from app.og_card import cache_key as og_cache_key
# ---------------------------------------------------------------------------
# Hilfsfunktion: deterministische SHA-256 einer Inline-Byte-Folge
# ---------------------------------------------------------------------------
def _sha(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
# ---------------------------------------------------------------------------
# Test 1: suggest_candidates — YAML-Lücke liefert leere Liste
# ---------------------------------------------------------------------------
class TestSuggestCandidates:
def test_returns_empty_when_no_yaml_entry(self):
"""BL/Partei ohne YAML-Eintrag → leere Liste, kein Fehler."""
with patch("app.wahlprogramm_fetch._load_links", return_value={}):
result = suggest_candidates("NRW", "BSW")
assert result == []
def test_returns_list_for_known_entry(self):
"""Bekannter Eintrag aus YAML → Liste mit mindestens einem Dict."""
fake = {
"NRW": {
"BSW": [{"url": "https://example.com/bsw.pdf", "titel": "BSW-Programm", "jahr": 2022}]
}
}
with patch("app.wahlprogramm_fetch._load_links", return_value=fake):
result = suggest_candidates("NRW", "BSW")
assert len(result) == 1
assert result[0]["url"] == "https://example.com/bsw.pdf"
def test_single_dict_is_wrapped_in_list(self):
"""Ein einzelnes Dict (statt Liste) wird transparent als Liste geliefert."""
fake = {
"NRW": {
"PIRATEN": {"url": "https://example.com/pir.pdf", "titel": "Piraten", "jahr": 2022}
}
}
with patch("app.wahlprogramm_fetch._load_links", return_value=fake):
result = suggest_candidates("NRW", "PIRATEN")
assert isinstance(result, list)
assert result[0]["url"] == "https://example.com/pir.pdf"
# ---------------------------------------------------------------------------
# Test 2: sha256_of_file — korrekte Berechnung
# ---------------------------------------------------------------------------
class TestSha256OfFile:
def test_matches_hashlib_direct(self, tmp_path):
"""SHA-256 der Funktion stimmt mit direktem hashlib-Ergebnis überein."""
data = b"Gemeinwohl-\xc3\x96konomie"
p = tmp_path / "test.bin"
p.write_bytes(data)
assert sha256_of_file(p) == _sha(data)
# ---------------------------------------------------------------------------
# Test 3: fetch_and_verify — Download-Stub ohne echte HTTP-Verbindung
# ---------------------------------------------------------------------------
class TestFetchAndVerify:
def _fake_urlopen(self, url_or_req, timeout=None):
"""Gibt ein kontextmanager-kompatibles Fake-Response-Objekt zurück."""
content = b"%PDF-1.4 fake-content"
class _FakeResp:
def read(self):
return content
def __enter__(self):
return self
def __exit__(self, *a):
pass
return _FakeResp()
def test_download_new_file(self, tmp_path):
"""Neue Datei wird korrekt heruntergeladen und gespeichert."""
dest = tmp_path / "test.pdf"
with patch("urllib.request.urlopen", self._fake_urlopen):
result = fetch_and_verify("https://example.com/test.pdf", dest)
assert result["ok"] is True
assert result["changed"] is True
assert dest.exists()
assert result["sha256"] == _sha(b"%PDF-1.4 fake-content")
def test_unchanged_file_not_overwritten(self, tmp_path):
"""Bereits vorhandene identische Datei wird nicht erneut gespeichert."""
content = b"%PDF-1.4 fake-content"
dest = tmp_path / "test.pdf"
dest.write_bytes(content)
with patch("urllib.request.urlopen", self._fake_urlopen):
result = fetch_and_verify("https://example.com/test.pdf", dest)
assert result["ok"] is True
assert result["changed"] is False
def test_sha_gate_rejects_wrong_hash(self, tmp_path):
"""Falscher expected_sha → Datei wird nicht gespeichert, ok=False."""
dest = tmp_path / "test.pdf"
wrong_sha = "a" * 64
with patch("urllib.request.urlopen", self._fake_urlopen):
result = fetch_and_verify("https://example.com/test.pdf", dest, expected_sha=wrong_sha)
assert result["ok"] is False
assert not dest.exists()
assert "SHA" in (result["error"] or "")
def test_network_error_returns_ok_false(self, tmp_path):
"""Netzwerkfehler → ok=False, kein unkontrollierter Absturz."""
dest = tmp_path / "test.pdf"
def _raise(*a, **kw):
raise OSError("Connection refused")
with patch("urllib.request.urlopen", _raise):
result = fetch_and_verify("https://example.com/test.pdf", dest)
assert result["ok"] is False
assert not dest.exists()
def test_prev_sha_captured_before_overwrite(self, tmp_path):
"""prev_sha256 wird korrekt gesetzt, wenn die Datei vorher vorhanden war."""
old_content = b"old-version"
dest = tmp_path / "test.pdf"
dest.write_bytes(old_content)
old_sha = _sha(old_content)
new_content = b"%PDF-1.4 fake-content"
def _new_urlopen(url_or_req, timeout=None):
class _R:
def read(self):
return new_content
def __enter__(self):
return self
def __exit__(self, *a):
pass
return _R()
with patch("urllib.request.urlopen", _new_urlopen):
result = fetch_and_verify("https://example.com/test.pdf", dest)
assert result["prev_sha256"] == old_sha
assert result["changed"] is True
# ---------------------------------------------------------------------------
# Test 4: SHA-Lock-File — Pferdetausch-Schutz (#138)
# ---------------------------------------------------------------------------
class TestShaLock:
"""Regression: abgeordnetenwatch hat das CDU-BE-2023-PDF unter dem alten
Slug-Namen gegen das CDU-BE-2026-PDF ersetzt. Der Lock-File-Mechanismus
muss solche stillen Tausch-Aktionen abfangen."""
def _patch_lock_file(self, tmp_path):
"""Setzt den Lock-File-Pfad auf einen tmp-Pfad fuer den Test."""
return patch("app.wahlprogramm_fetch._LOCK_FILE", tmp_path / "lock.json")
def _urlopen_with(self, content: bytes):
def _u(url_or_req, timeout=None):
class _R:
def read(self_inner):
return content
def __enter__(self_inner):
return self_inner
def __exit__(self_inner, *a):
pass
return _R()
return _u
def test_first_download_pins_sha(self, tmp_path):
"""Erster Download → Lock-File wird angelegt mit dem neuen SHA."""
dest = tmp_path / "cdu-be.pdf"
content = b"%PDF original CDU BE 2021"
with self._patch_lock_file(tmp_path), \
patch("urllib.request.urlopen", self._urlopen_with(content)):
result = fetch_and_verify("https://example.com/cdu-be.pdf", dest)
assert result["ok"] is True
assert result["lock_updated"] is True
lock_path = tmp_path / "lock.json"
assert lock_path.exists()
import json
lock = json.loads(lock_path.read_text())
assert lock["cdu-be.pdf"] == _sha(content)
def test_second_download_with_same_content_passes(self, tmp_path):
"""Zweiter Download mit gleichem Inhalt → ok, changed=False."""
dest = tmp_path / "cdu-be.pdf"
content = b"%PDF original CDU BE 2021"
dest.write_bytes(content)
# Lock vorbereiten
import json
(tmp_path / "lock.json").write_text(json.dumps({"cdu-be.pdf": _sha(content)}))
with self._patch_lock_file(tmp_path), \
patch("urllib.request.urlopen", self._urlopen_with(content)):
result = fetch_and_verify("https://example.com/cdu-be.pdf", dest)
assert result["ok"] is True
assert result["changed"] is False
def test_pferdetausch_blocks_silent_replacement(self, tmp_path):
"""KRITISCH: lokal liegt 'CDU BE 2021', Server liefert 'CDU BE 2026'.
Lock zeigt SHA von 2021 fetch muss ABBRECHEN, nicht ueberschreiben."""
dest = tmp_path / "cdu-be-2023.pdf"
original_content = b"%PDF CDU Berlin 2021-2026 Wahlprogramm"
replaced_content = b"%PDF CDU Berlin-Plan 2026 (replaced!)"
dest.write_bytes(original_content)
# Lock pinnt den Original-SHA
import json
(tmp_path / "lock.json").write_text(
json.dumps({"cdu-be-2023.pdf": _sha(original_content)})
)
with self._patch_lock_file(tmp_path), \
patch("urllib.request.urlopen", self._urlopen_with(replaced_content)):
result = fetch_and_verify("https://example.com/cdu-be-2023.pdf", dest)
assert result["ok"] is False
assert "Lock-Pruefung" in result["error"]
# Datei darf NICHT ueberschrieben sein
assert dest.read_bytes() == original_content
def test_accept_new_sha_overrides_lock(self, tmp_path):
"""Mit accept_new_sha=True wird der Lock bewusst aktualisiert."""
dest = tmp_path / "linke-bb.pdf"
original_content = b"%PDF v1"
new_content = b"%PDF v2 - intentional update"
dest.write_bytes(original_content)
import json
(tmp_path / "lock.json").write_text(
json.dumps({"linke-bb.pdf": _sha(original_content)})
)
with self._patch_lock_file(tmp_path), \
patch("urllib.request.urlopen", self._urlopen_with(new_content)):
result = fetch_and_verify(
"https://example.com/linke-bb.pdf", dest,
accept_new_sha=True,
)
assert result["ok"] is True
assert result["changed"] is True
# Lock muss neuen SHA haben
lock = json.loads((tmp_path / "lock.json").read_text())
assert lock["linke-bb.pdf"] == _sha(new_content)
def test_existing_file_without_lock_pins_silently(self, tmp_path):
"""File ist da aber Lock fehlt (Migration-Szenario): bei naechstem
identischen fetch wird der SHA gepinnt, kein Block."""
dest = tmp_path / "spd-mv.pdf"
content = b"%PDF SPD MV 2021"
dest.write_bytes(content)
# Kein Lock-Eintrag
with self._patch_lock_file(tmp_path), \
patch("urllib.request.urlopen", self._urlopen_with(content)):
result = fetch_and_verify("https://example.com/spd-mv.pdf", dest)
assert result["ok"] is True
assert result["lock_updated"] is True
import json
lock = json.loads((tmp_path / "lock.json").read_text())
assert lock["spd-mv.pdf"] == _sha(content)
# ---------------------------------------------------------------------------
# Test 5: Lock-File und YAML-Robustheit (#134 Coverage-Backfill)
# ---------------------------------------------------------------------------
class TestLockFileRobustness:
def test_corrupt_lock_file_returns_empty_dict(self, tmp_path):
"""Kaputtes JSON darf den Caller nicht crashen — leeren Lock liefern."""
from app.wahlprogramm_fetch import _load_lock
bad = tmp_path / "broken-lock.json"
bad.write_text("{ this is not json ;)")
with patch("app.wahlprogramm_fetch._LOCK_FILE", bad):
result = _load_lock()
assert result == {}
def test_missing_lock_file_returns_empty_dict(self, tmp_path):
from app.wahlprogramm_fetch import _load_lock
missing = tmp_path / "no-such-file.json"
with patch("app.wahlprogramm_fetch._LOCK_FILE", missing):
assert _load_lock() == {}
def test_save_lock_writes_valid_json(self, tmp_path):
from app.wahlprogramm_fetch import _save_lock
target = tmp_path / "lock.json"
with patch("app.wahlprogramm_fetch._LOCK_FILE", target):
_save_lock({"x.pdf": "abc123", "y.pdf": "def456"})
import json
loaded = json.loads(target.read_text())
assert loaded == {"x.pdf": "abc123", "y.pdf": "def456"}
class TestLoadLinks:
def test_missing_yaml_returns_empty(self, tmp_path):
from app.wahlprogramm_fetch import _load_links
with patch("app.wahlprogramm_fetch._LINKS_FILE", tmp_path / "missing.yaml"):
assert _load_links() == {}
def test_empty_yaml_returns_empty(self, tmp_path):
from app.wahlprogramm_fetch import _load_links
target = tmp_path / "empty.yaml"
target.write_text("")
with patch("app.wahlprogramm_fetch._LINKS_FILE", target):
assert _load_links() == {}
# Hinweis: yaml ist im Unit-Setup gestubbed (siehe Top-of-File), deshalb
# testen wir _load_links nur mit existing-vs-missing-File. Die echte
# YAML-Parsing-Logik wird in der integration-Suite gegen die echte
# links.yaml validiert.
class TestGetMissingProgrammes:
"""Tests fuer get_missing_programmes — listet BL/Partei-Kombinationen mit
Kandidaten-URL aber fehlender lokaler Datei. yaml ist gestubbed; Tests
patchen daher _load_links direkt."""
def test_no_yaml_returns_empty(self):
from app.wahlprogramm_fetch import get_missing_programmes
with patch("app.wahlprogramm_fetch._load_links", return_value={}):
assert get_missing_programmes() == []
def test_lists_entries_when_file_missing(self, tmp_path):
"""Eintrag in YAML, registriertes WAHLPROGRAMME-File fehlt → listed."""
from app.wahlprogramm_fetch import get_missing_programmes
fake_links = {"BX": {"XYZ": [{"url": "https://example.com/x.pdf"}]}}
with patch("app.wahlprogramm_fetch._load_links", return_value=fake_links):
with patch("app.wahlprogramm_fetch._REFERENZEN_DIR", tmp_path / "ref"):
missing = get_missing_programmes()
codes = [m["bl"] for m in missing]
assert "BX" in codes
def test_bundesland_filter(self, tmp_path):
from app.wahlprogramm_fetch import get_missing_programmes
fake_links = {
"BX": {"XYZ": [{"url": "https://example.com/x.pdf"}]},
"BY": {"ABC": [{"url": "https://example.com/y.pdf"}]},
}
with patch("app.wahlprogramm_fetch._load_links", return_value=fake_links):
with patch("app.wahlprogramm_fetch._REFERENZEN_DIR", tmp_path / "ref"):
missing = get_missing_programmes(bundesland="BX")
codes = {m["bl"] for m in missing}
assert codes == {"BX"}
# ---------------------------------------------------------------------------
# Test 4: og_card — cache_key Determinismus und Cache-Miss/Hit
# ---------------------------------------------------------------------------
class TestOgCacheKey:
def test_same_inputs_same_key(self):
k1 = og_cache_key("NRW-18/1234", "2026-04-20T10:00:00")
k2 = og_cache_key("NRW-18/1234", "2026-04-20T10:00:00")
assert k1 == k2
def test_different_updated_at_different_key(self):
k1 = og_cache_key("NRW-18/1234", "2026-04-20T10:00:00")
k2 = og_cache_key("NRW-18/1234", "2026-04-21T10:00:00")
assert k1 != k2
def test_key_length_16(self):
k = og_cache_key("NRW-18/1234", "2026-04-20T10:00:00")
assert len(k) == 16
def test_cache_miss_when_file_absent(self, tmp_path):
from app.og_card import get_cached
result = get_cached("NRW-18/9999", "2026-01-01T00:00:00", cache_dir=tmp_path)
assert result is None
def test_cache_hit_when_file_present(self, tmp_path):
from app.og_card import get_cached, cache_key as ck
drucksache = "NRW-18/9999"
updated_at = "2026-01-01T00:00:00"
key = ck(drucksache, updated_at)
safe = drucksache.replace("/", "_").replace(" ", "_")
p = tmp_path / f"{safe}_{key}.png"
p.write_bytes(b"\x89PNG")
result = get_cached(drucksache, updated_at, cache_dir=tmp_path)
assert result == p