"""Tests für wahlprogramm_fetch.py (#138) — SHA-Gate und Kandidaten-Suche.""" from __future__ import annotations import hashlib import sys import types from pathlib import Path from unittest.mock import MagicMock, patch import pytest # --------------------------------------------------------------------------- # Stub yaml, damit der Import ohne PyPI-Paket läuft # --------------------------------------------------------------------------- if "yaml" not in sys.modules: _yaml_mod = types.ModuleType("yaml") def _safe_load(fh): return {} _yaml_mod.safe_load = _safe_load sys.modules["yaml"] = _yaml_mod from app.wahlprogramm_fetch import ( fetch_and_verify, sha256_of_file, suggest_candidates, ) from app.og_card import cache_key as og_cache_key # --------------------------------------------------------------------------- # Hilfsfunktion: deterministische SHA-256 einer Inline-Byte-Folge # --------------------------------------------------------------------------- def _sha(data: bytes) -> str: return hashlib.sha256(data).hexdigest() # --------------------------------------------------------------------------- # Test 1: suggest_candidates — YAML-Lücke liefert leere Liste # --------------------------------------------------------------------------- class TestSuggestCandidates: def test_returns_empty_when_no_yaml_entry(self): """BL/Partei ohne YAML-Eintrag → leere Liste, kein Fehler.""" with patch("app.wahlprogramm_fetch._load_links", return_value={}): result = suggest_candidates("NRW", "BSW") assert result == [] def test_returns_list_for_known_entry(self): """Bekannter Eintrag aus YAML → Liste mit mindestens einem Dict.""" fake = { "NRW": { "BSW": [{"url": "https://example.com/bsw.pdf", "titel": "BSW-Programm", "jahr": 2022}] } } with patch("app.wahlprogramm_fetch._load_links", return_value=fake): result = suggest_candidates("NRW", "BSW") assert len(result) == 1 assert result[0]["url"] == "https://example.com/bsw.pdf" def test_single_dict_is_wrapped_in_list(self): """Ein einzelnes Dict (statt Liste) wird transparent als Liste geliefert.""" fake = { "NRW": { "PIRATEN": {"url": "https://example.com/pir.pdf", "titel": "Piraten", "jahr": 2022} } } with patch("app.wahlprogramm_fetch._load_links", return_value=fake): result = suggest_candidates("NRW", "PIRATEN") assert isinstance(result, list) assert result[0]["url"] == "https://example.com/pir.pdf" # --------------------------------------------------------------------------- # Test 2: sha256_of_file — korrekte Berechnung # --------------------------------------------------------------------------- class TestSha256OfFile: def test_matches_hashlib_direct(self, tmp_path): """SHA-256 der Funktion stimmt mit direktem hashlib-Ergebnis überein.""" data = b"Gemeinwohl-\xc3\x96konomie" p = tmp_path / "test.bin" p.write_bytes(data) assert sha256_of_file(p) == _sha(data) # --------------------------------------------------------------------------- # Test 3: fetch_and_verify — Download-Stub ohne echte HTTP-Verbindung # --------------------------------------------------------------------------- class TestFetchAndVerify: def _fake_urlopen(self, url_or_req, timeout=None): """Gibt ein kontextmanager-kompatibles Fake-Response-Objekt zurück.""" content = b"%PDF-1.4 fake-content" class _FakeResp: def read(self): return content def __enter__(self): return self def __exit__(self, *a): pass return _FakeResp() def test_download_new_file(self, tmp_path): """Neue Datei wird korrekt heruntergeladen und gespeichert.""" dest = tmp_path / "test.pdf" with patch("urllib.request.urlopen", self._fake_urlopen): result = fetch_and_verify("https://example.com/test.pdf", dest) assert result["ok"] is True assert result["changed"] is True assert dest.exists() assert result["sha256"] == _sha(b"%PDF-1.4 fake-content") def test_unchanged_file_not_overwritten(self, tmp_path): """Bereits vorhandene identische Datei wird nicht erneut gespeichert.""" content = b"%PDF-1.4 fake-content" dest = tmp_path / "test.pdf" dest.write_bytes(content) with patch("urllib.request.urlopen", self._fake_urlopen): result = fetch_and_verify("https://example.com/test.pdf", dest) assert result["ok"] is True assert result["changed"] is False def test_sha_gate_rejects_wrong_hash(self, tmp_path): """Falscher expected_sha → Datei wird nicht gespeichert, ok=False.""" dest = tmp_path / "test.pdf" wrong_sha = "a" * 64 with patch("urllib.request.urlopen", self._fake_urlopen): result = fetch_and_verify("https://example.com/test.pdf", dest, expected_sha=wrong_sha) assert result["ok"] is False assert not dest.exists() assert "SHA" in (result["error"] or "") def test_network_error_returns_ok_false(self, tmp_path): """Netzwerkfehler → ok=False, kein unkontrollierter Absturz.""" dest = tmp_path / "test.pdf" def _raise(*a, **kw): raise OSError("Connection refused") with patch("urllib.request.urlopen", _raise): result = fetch_and_verify("https://example.com/test.pdf", dest) assert result["ok"] is False assert not dest.exists() def test_prev_sha_captured_before_overwrite(self, tmp_path): """prev_sha256 wird korrekt gesetzt, wenn die Datei vorher vorhanden war.""" old_content = b"old-version" dest = tmp_path / "test.pdf" dest.write_bytes(old_content) old_sha = _sha(old_content) new_content = b"%PDF-1.4 fake-content" def _new_urlopen(url_or_req, timeout=None): class _R: def read(self): return new_content def __enter__(self): return self def __exit__(self, *a): pass return _R() with patch("urllib.request.urlopen", _new_urlopen): result = fetch_and_verify("https://example.com/test.pdf", dest) assert result["prev_sha256"] == old_sha assert result["changed"] is True # --------------------------------------------------------------------------- # Test 4: SHA-Lock-File — Pferdetausch-Schutz (#138) # --------------------------------------------------------------------------- class TestShaLock: """Regression: abgeordnetenwatch hat das CDU-BE-2023-PDF unter dem alten Slug-Namen gegen das CDU-BE-2026-PDF ersetzt. Der Lock-File-Mechanismus muss solche stillen Tausch-Aktionen abfangen.""" def _patch_lock_file(self, tmp_path): """Setzt den Lock-File-Pfad auf einen tmp-Pfad fuer den Test.""" return patch("app.wahlprogramm_fetch._LOCK_FILE", tmp_path / "lock.json") def _urlopen_with(self, content: bytes): def _u(url_or_req, timeout=None): class _R: def read(self_inner): return content def __enter__(self_inner): return self_inner def __exit__(self_inner, *a): pass return _R() return _u def test_first_download_pins_sha(self, tmp_path): """Erster Download → Lock-File wird angelegt mit dem neuen SHA.""" dest = tmp_path / "cdu-be.pdf" content = b"%PDF original CDU BE 2021" with self._patch_lock_file(tmp_path), \ patch("urllib.request.urlopen", self._urlopen_with(content)): result = fetch_and_verify("https://example.com/cdu-be.pdf", dest) assert result["ok"] is True assert result["lock_updated"] is True lock_path = tmp_path / "lock.json" assert lock_path.exists() import json lock = json.loads(lock_path.read_text()) assert lock["cdu-be.pdf"] == _sha(content) def test_second_download_with_same_content_passes(self, tmp_path): """Zweiter Download mit gleichem Inhalt → ok, changed=False.""" dest = tmp_path / "cdu-be.pdf" content = b"%PDF original CDU BE 2021" dest.write_bytes(content) # Lock vorbereiten import json (tmp_path / "lock.json").write_text(json.dumps({"cdu-be.pdf": _sha(content)})) with self._patch_lock_file(tmp_path), \ patch("urllib.request.urlopen", self._urlopen_with(content)): result = fetch_and_verify("https://example.com/cdu-be.pdf", dest) assert result["ok"] is True assert result["changed"] is False def test_pferdetausch_blocks_silent_replacement(self, tmp_path): """KRITISCH: lokal liegt 'CDU BE 2021', Server liefert 'CDU BE 2026'. Lock zeigt SHA von 2021 → fetch muss ABBRECHEN, nicht ueberschreiben.""" dest = tmp_path / "cdu-be-2023.pdf" original_content = b"%PDF CDU Berlin 2021-2026 Wahlprogramm" replaced_content = b"%PDF CDU Berlin-Plan 2026 (replaced!)" dest.write_bytes(original_content) # Lock pinnt den Original-SHA import json (tmp_path / "lock.json").write_text( json.dumps({"cdu-be-2023.pdf": _sha(original_content)}) ) with self._patch_lock_file(tmp_path), \ patch("urllib.request.urlopen", self._urlopen_with(replaced_content)): result = fetch_and_verify("https://example.com/cdu-be-2023.pdf", dest) assert result["ok"] is False assert "Lock-Pruefung" in result["error"] # Datei darf NICHT ueberschrieben sein assert dest.read_bytes() == original_content def test_accept_new_sha_overrides_lock(self, tmp_path): """Mit accept_new_sha=True wird der Lock bewusst aktualisiert.""" dest = tmp_path / "linke-bb.pdf" original_content = b"%PDF v1" new_content = b"%PDF v2 - intentional update" dest.write_bytes(original_content) import json (tmp_path / "lock.json").write_text( json.dumps({"linke-bb.pdf": _sha(original_content)}) ) with self._patch_lock_file(tmp_path), \ patch("urllib.request.urlopen", self._urlopen_with(new_content)): result = fetch_and_verify( "https://example.com/linke-bb.pdf", dest, accept_new_sha=True, ) assert result["ok"] is True assert result["changed"] is True # Lock muss neuen SHA haben lock = json.loads((tmp_path / "lock.json").read_text()) assert lock["linke-bb.pdf"] == _sha(new_content) def test_existing_file_without_lock_pins_silently(self, tmp_path): """File ist da aber Lock fehlt (Migration-Szenario): bei naechstem identischen fetch wird der SHA gepinnt, kein Block.""" dest = tmp_path / "spd-mv.pdf" content = b"%PDF SPD MV 2021" dest.write_bytes(content) # Kein Lock-Eintrag with self._patch_lock_file(tmp_path), \ patch("urllib.request.urlopen", self._urlopen_with(content)): result = fetch_and_verify("https://example.com/spd-mv.pdf", dest) assert result["ok"] is True assert result["lock_updated"] is True import json lock = json.loads((tmp_path / "lock.json").read_text()) assert lock["spd-mv.pdf"] == _sha(content) # --------------------------------------------------------------------------- # Test 4: og_card — cache_key Determinismus und Cache-Miss/Hit # --------------------------------------------------------------------------- class TestOgCacheKey: def test_same_inputs_same_key(self): k1 = og_cache_key("NRW-18/1234", "2026-04-20T10:00:00") k2 = og_cache_key("NRW-18/1234", "2026-04-20T10:00:00") assert k1 == k2 def test_different_updated_at_different_key(self): k1 = og_cache_key("NRW-18/1234", "2026-04-20T10:00:00") k2 = og_cache_key("NRW-18/1234", "2026-04-21T10:00:00") assert k1 != k2 def test_key_length_16(self): k = og_cache_key("NRW-18/1234", "2026-04-20T10:00:00") assert len(k) == 16 def test_cache_miss_when_file_absent(self, tmp_path): from app.og_card import get_cached result = get_cached("NRW-18/9999", "2026-01-01T00:00:00", cache_dir=tmp_path) assert result is None def test_cache_hit_when_file_present(self, tmp_path): from app.og_card import get_cached, cache_key as ck drucksache = "NRW-18/9999" updated_at = "2026-01-01T00:00:00" key = ck(drucksache, updated_at) safe = drucksache.replace("/", "_").replace(" ", "_") p = tmp_path / f"{safe}_{key}.png" p.write_bytes(b"\x89PNG") result = get_cached(drucksache, updated_at, cache_dir=tmp_path) assert result == p