gwoe-antragspruefer/tests/test_wahlprogramm_fetch.py
Dotty Dotter 2902164eff test: 467 -> 574 Tests (+107) — DDD, abgeordnetenwatch, monitoring, v2, Bug-Regressions
Neue Tests in dieser Migration:
- test_database.py (Merkliste-CRUD, Subscriptions, abgeordnetenwatch-Joins)
- test_clustering.py (82% Coverage)
- test_drucksache_typen.py (100%)
- test_mail.py (86%)
- test_monitoring.py (23 Tests)
- test_abgeordnetenwatch.py (23 Tests, inkl. Drucksache-Extraction)
- test_redline_parser.py (20 Tests fuer §INS§/§DEL§-Marker)
- test_bug_regressions.py (PRAGMA, JWT-azp, CDU-PDF, PFLICHT-FRAKTIONEN, NRW-Titel)
- test_embeddings_v3_v4.py (WRITE/READ-Pattern)
- test_wahlprogramm_check.py (#128)
- test_wahlprogramm_fetch.py (#138)
- test_antrag/bewertung/abonnement_repository.py + test_llm_bewerter.py (DDD)
- test_domain_behavior.py (5 Domain-Methoden boundary tests)
- tests/e2e/test_ui.py (Playwright)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 20:55:57 +02:00

214 lines
7.9 KiB
Python

"""Tests für wahlprogramm_fetch.py (#138) — SHA-Gate und Kandidaten-Suche."""
from __future__ import annotations
import hashlib
import sys
import types
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Stub yaml, damit der Import ohne PyPI-Paket läuft
# ---------------------------------------------------------------------------
if "yaml" not in sys.modules:
_yaml_mod = types.ModuleType("yaml")
def _safe_load(fh):
return {}
_yaml_mod.safe_load = _safe_load
sys.modules["yaml"] = _yaml_mod
from app.wahlprogramm_fetch import (
fetch_and_verify,
sha256_of_file,
suggest_candidates,
)
from app.og_card import cache_key as og_cache_key
# ---------------------------------------------------------------------------
# Hilfsfunktion: deterministische SHA-256 einer Inline-Byte-Folge
# ---------------------------------------------------------------------------
def _sha(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
# ---------------------------------------------------------------------------
# Test 1: suggest_candidates — YAML-Lücke liefert leere Liste
# ---------------------------------------------------------------------------
class TestSuggestCandidates:
def test_returns_empty_when_no_yaml_entry(self):
"""BL/Partei ohne YAML-Eintrag → leere Liste, kein Fehler."""
with patch("app.wahlprogramm_fetch._load_links", return_value={}):
result = suggest_candidates("NRW", "BSW")
assert result == []
def test_returns_list_for_known_entry(self):
"""Bekannter Eintrag aus YAML → Liste mit mindestens einem Dict."""
fake = {
"NRW": {
"BSW": [{"url": "https://example.com/bsw.pdf", "titel": "BSW-Programm", "jahr": 2022}]
}
}
with patch("app.wahlprogramm_fetch._load_links", return_value=fake):
result = suggest_candidates("NRW", "BSW")
assert len(result) == 1
assert result[0]["url"] == "https://example.com/bsw.pdf"
def test_single_dict_is_wrapped_in_list(self):
"""Ein einzelnes Dict (statt Liste) wird transparent als Liste geliefert."""
fake = {
"NRW": {
"PIRATEN": {"url": "https://example.com/pir.pdf", "titel": "Piraten", "jahr": 2022}
}
}
with patch("app.wahlprogramm_fetch._load_links", return_value=fake):
result = suggest_candidates("NRW", "PIRATEN")
assert isinstance(result, list)
assert result[0]["url"] == "https://example.com/pir.pdf"
# ---------------------------------------------------------------------------
# Test 2: sha256_of_file — korrekte Berechnung
# ---------------------------------------------------------------------------
class TestSha256OfFile:
def test_matches_hashlib_direct(self, tmp_path):
"""SHA-256 der Funktion stimmt mit direktem hashlib-Ergebnis überein."""
data = b"Gemeinwohl-\xc3\x96konomie"
p = tmp_path / "test.bin"
p.write_bytes(data)
assert sha256_of_file(p) == _sha(data)
# ---------------------------------------------------------------------------
# Test 3: fetch_and_verify — Download-Stub ohne echte HTTP-Verbindung
# ---------------------------------------------------------------------------
class TestFetchAndVerify:
def _fake_urlopen(self, url_or_req, timeout=None):
"""Gibt ein kontextmanager-kompatibles Fake-Response-Objekt zurück."""
content = b"%PDF-1.4 fake-content"
class _FakeResp:
def read(self):
return content
def __enter__(self):
return self
def __exit__(self, *a):
pass
return _FakeResp()
def test_download_new_file(self, tmp_path):
"""Neue Datei wird korrekt heruntergeladen und gespeichert."""
dest = tmp_path / "test.pdf"
with patch("urllib.request.urlopen", self._fake_urlopen):
result = fetch_and_verify("https://example.com/test.pdf", dest)
assert result["ok"] is True
assert result["changed"] is True
assert dest.exists()
assert result["sha256"] == _sha(b"%PDF-1.4 fake-content")
def test_unchanged_file_not_overwritten(self, tmp_path):
"""Bereits vorhandene identische Datei wird nicht erneut gespeichert."""
content = b"%PDF-1.4 fake-content"
dest = tmp_path / "test.pdf"
dest.write_bytes(content)
with patch("urllib.request.urlopen", self._fake_urlopen):
result = fetch_and_verify("https://example.com/test.pdf", dest)
assert result["ok"] is True
assert result["changed"] is False
def test_sha_gate_rejects_wrong_hash(self, tmp_path):
"""Falscher expected_sha → Datei wird nicht gespeichert, ok=False."""
dest = tmp_path / "test.pdf"
wrong_sha = "a" * 64
with patch("urllib.request.urlopen", self._fake_urlopen):
result = fetch_and_verify("https://example.com/test.pdf", dest, expected_sha=wrong_sha)
assert result["ok"] is False
assert not dest.exists()
assert "SHA" in (result["error"] or "")
def test_network_error_returns_ok_false(self, tmp_path):
"""Netzwerkfehler → ok=False, kein unkontrollierter Absturz."""
dest = tmp_path / "test.pdf"
def _raise(*a, **kw):
raise OSError("Connection refused")
with patch("urllib.request.urlopen", _raise):
result = fetch_and_verify("https://example.com/test.pdf", dest)
assert result["ok"] is False
assert not dest.exists()
def test_prev_sha_captured_before_overwrite(self, tmp_path):
"""prev_sha256 wird korrekt gesetzt, wenn die Datei vorher vorhanden war."""
old_content = b"old-version"
dest = tmp_path / "test.pdf"
dest.write_bytes(old_content)
old_sha = _sha(old_content)
new_content = b"%PDF-1.4 fake-content"
def _new_urlopen(url_or_req, timeout=None):
class _R:
def read(self):
return new_content
def __enter__(self):
return self
def __exit__(self, *a):
pass
return _R()
with patch("urllib.request.urlopen", _new_urlopen):
result = fetch_and_verify("https://example.com/test.pdf", dest)
assert result["prev_sha256"] == old_sha
assert result["changed"] is True
# ---------------------------------------------------------------------------
# Test 4: og_card — cache_key Determinismus und Cache-Miss/Hit
# ---------------------------------------------------------------------------
class TestOgCacheKey:
def test_same_inputs_same_key(self):
k1 = og_cache_key("NRW-18/1234", "2026-04-20T10:00:00")
k2 = og_cache_key("NRW-18/1234", "2026-04-20T10:00:00")
assert k1 == k2
def test_different_updated_at_different_key(self):
k1 = og_cache_key("NRW-18/1234", "2026-04-20T10:00:00")
k2 = og_cache_key("NRW-18/1234", "2026-04-21T10:00:00")
assert k1 != k2
def test_key_length_16(self):
k = og_cache_key("NRW-18/1234", "2026-04-20T10:00:00")
assert len(k) == 16
def test_cache_miss_when_file_absent(self, tmp_path):
from app.og_card import get_cached
result = get_cached("NRW-18/9999", "2026-01-01T00:00:00", cache_dir=tmp_path)
assert result is None
def test_cache_hit_when_file_present(self, tmp_path):
from app.og_card import get_cached, cache_key as ck
drucksache = "NRW-18/9999"
updated_at = "2026-01-01T00:00:00"
key = ck(drucksache, updated_at)
safe = drucksache.replace("/", "_").replace(" ", "_")
p = tmp_path / f"{safe}_{key}.png"
p.write_bytes(b"\x89PNG")
result = get_cached(drucksache, updated_at, cache_dir=tmp_path)
assert result == p