From 2902164eff1a8a1887e6012a2e4176b66e1b6def Mon Sep 17 00:00:00 2001 From: Dotty Dotter Date: Sat, 25 Apr 2026 20:55:57 +0200 Subject: [PATCH] =?UTF-8?q?test:=20467=20->=20574=20Tests=20(+107)=20?= =?UTF-8?q?=E2=80=94=20DDD,=20abgeordnetenwatch,=20monitoring,=20v2,=20Bug?= =?UTF-8?q?-Regressions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Neue Tests in dieser Migration: - test_database.py (Merkliste-CRUD, Subscriptions, abgeordnetenwatch-Joins) - test_clustering.py (82% Coverage) - test_drucksache_typen.py (100%) - test_mail.py (86%) - test_monitoring.py (23 Tests) - test_abgeordnetenwatch.py (23 Tests, inkl. Drucksache-Extraction) - test_redline_parser.py (20 Tests fuer §INS§/§DEL§-Marker) - test_bug_regressions.py (PRAGMA, JWT-azp, CDU-PDF, PFLICHT-FRAKTIONEN, NRW-Titel) - test_embeddings_v3_v4.py (WRITE/READ-Pattern) - test_wahlprogramm_check.py (#128) - test_wahlprogramm_fetch.py (#138) - test_antrag/bewertung/abonnement_repository.py + test_llm_bewerter.py (DDD) - test_domain_behavior.py (5 Domain-Methoden boundary tests) - tests/e2e/test_ui.py (Playwright) Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/e2e/__init__.py | 0 tests/e2e/test_ui.py | 210 +++++++++++ tests/test_abgeordnetenwatch.py | 405 ++++++++++++++++++++ tests/test_abonnement_repository.py | 95 +++++ tests/test_antrag_repository.py | 158 ++++++++ tests/test_auth.py | 111 +++++- tests/test_bewertung_repository.py | 49 +++ tests/test_bug_regressions.py | 408 ++++++++++++++++++++ tests/test_clustering.py | 438 ++++++++++++++++++++++ tests/test_database.py | 554 ++++++++++++++++++++++++++++ tests/test_domain_behavior.py | 154 ++++++++ tests/test_drucksache_typen.py | 204 ++++++++++ tests/test_embeddings_v3_v4.py | 266 +++++++++++++ tests/test_llm_bewerter.py | 137 +++++++ tests/test_mail.py | 354 ++++++++++++++++++ tests/test_monitoring.py | 353 ++++++++++++++++++ tests/test_parlamente.py | 236 ++++++++++++ tests/test_redline_parser.py | 145 ++++++++ tests/test_wahlprogramm_check.py | 69 ++++ tests/test_wahlprogramm_fetch.py | 213 +++++++++++ 20 files changed, 4557 insertions(+), 2 deletions(-) create mode 100644 tests/e2e/__init__.py create mode 100644 tests/e2e/test_ui.py create mode 100644 tests/test_abgeordnetenwatch.py create mode 100644 tests/test_abonnement_repository.py create mode 100644 tests/test_antrag_repository.py create mode 100644 tests/test_bewertung_repository.py create mode 100644 tests/test_bug_regressions.py create mode 100644 tests/test_clustering.py create mode 100644 tests/test_database.py create mode 100644 tests/test_domain_behavior.py create mode 100644 tests/test_drucksache_typen.py create mode 100644 tests/test_embeddings_v3_v4.py create mode 100644 tests/test_llm_bewerter.py create mode 100644 tests/test_mail.py create mode 100644 tests/test_monitoring.py create mode 100644 tests/test_redline_parser.py create mode 100644 tests/test_wahlprogramm_check.py create mode 100644 tests/test_wahlprogramm_fetch.py diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/e2e/test_ui.py b/tests/e2e/test_ui.py new file mode 100644 index 0000000..a03a223 --- /dev/null +++ b/tests/e2e/test_ui.py @@ -0,0 +1,210 @@ +"""E2E UI tests with Playwright/Chromium (#120). + +Run: pytest tests/e2e/ -m e2e --headed (with browser) + pytest tests/e2e/ -m e2e (headless) + +Requires: pip install playwright && playwright install chromium +Target: https://gwoe.toppyr.de (live) +""" + +import pytest + +BASE_URL = "https://gwoe.toppyr.de" + +pytestmark = pytest.mark.e2e + + +@pytest.fixture(scope="module") +def browser(): + from playwright.sync_api import sync_playwright + pw = sync_playwright().start() + br = pw.chromium.launch(headless=True) + yield br + br.close() + pw.stop() + + +@pytest.fixture +def page(browser): + p = browser.new_page() + yield p + p.close() + + +# ─── Page Load ──────────────────────────────────────────────────────── + +def test_main_page_loads(page): + page.goto(BASE_URL) + page.wait_for_timeout(3000) + assert page.title() == "GWÖ-Antragsprüfer" + + +def test_assessments_render(page): + page.goto(BASE_URL) + page.wait_for_timeout(3000) + items = page.locator(".list-item").count() + assert items >= 1, "Keine Assessments in der Liste" + + +def test_no_js_errors(page): + errors = [] + page.on("pageerror", lambda err: errors.append(err.message)) + page.goto(BASE_URL) + page.wait_for_timeout(3000) + assert not errors, f"JS-Fehler: {errors}" + + +# ─── Navigation ────────────────────────────────────────────────────── + +def test_detail_loads_on_click(page): + page.goto(BASE_URL) + page.wait_for_timeout(3000) + first = page.locator(".list-item").first + first.click() + page.wait_for_timeout(2000) + detail = page.locator("#detail-panel").inner_text() + assert len(detail) > 50, "Detail-Panel leer nach Klick" + + +def test_keyboard_navigation(page): + page.goto(BASE_URL) + page.wait_for_timeout(3000) + # j selects first item + page.keyboard.press("j") + active = page.locator(".list-item.active").count() + assert active == 1, "j-Taste markiert kein Item" + # k goes back (stays on first) + page.keyboard.press("k") + assert page.locator(".list-item.active").count() == 1 + + +def test_search(page): + page.goto(BASE_URL) + page.wait_for_timeout(3000) + search = page.locator("#search-input") + search.fill("Bildung") + page.wait_for_timeout(1000) + # Should still show items (or search results) + items = page.locator(".list-item").count() + assert items >= 0 # search may return 0, but shouldn't error + + +# ─── Subpages ──────────────────────────────────────────────────────── + +def test_impressum_page(page): + page.goto(f"{BASE_URL}/impressum") + page.wait_for_timeout(1000) + assert "Angaben" in page.content() + + +def test_datenschutz_page(page): + page.goto(f"{BASE_URL}/datenschutz") + page.wait_for_timeout(1000) + assert "Datenschutz" in page.content() + + +def test_methodik_page(page): + page.goto(f"{BASE_URL}/methodik") + page.wait_for_timeout(1000) + assert "Methodik" in page.content() + + +def test_auswertungen_page(page): + errors = [] + page.on("pageerror", lambda err: errors.append(err.message)) + page.goto(f"{BASE_URL}/auswertungen") + page.wait_for_timeout(2000) + assert not errors, f"JS-Fehler auf Auswertungen: {errors}" + + +def test_quellen_page(page): + page.goto(f"{BASE_URL}/quellen") + page.wait_for_timeout(1000) + assert "Wahlprogramm" in page.content() or "Quellen" in page.content() + + +# ─── API ───────────────────────────────────────────────────────────── + +def test_api_assessments(page): + resp = page.request.get(f"{BASE_URL}/api/assessments") + assert resp.ok + data = resp.json() + assert isinstance(data, list) + assert len(data) > 0 + # Lightweight: should NOT contain gwoeMatrix (memory optimization) + assert "gwoeMatrix" not in data[0], "List API should not contain detail fields" + + +def test_api_assessment_detail(page): + list_resp = page.request.get(f"{BASE_URL}/api/assessments") + first = list_resp.json()[0] + ds = first["drucksache"] + resp = page.request.get(f"{BASE_URL}/api/assessment?drucksache={ds}") + assert resp.ok + detail = resp.json() + assert "gwoeMatrix" in detail, "Detail API should contain gwoeMatrix" + assert "gwoeBegründung" in detail + + +def test_api_export_json(page): + resp = page.request.get(f"{BASE_URL}/api/auswertungen/export.json") + assert resp.ok + data = resp.json() + assert "meta" in data + assert data["meta"]["license"] == "CC BY 4.0" + assert len(data["assessments"]) > 0 + + +def test_api_votes(page): + resp = page.request.get(f"{BASE_URL}/api/votes?drucksache=18/8125") + assert resp.ok + data = resp.json() + assert "counts" in data + assert "my_votes" in data + + +def test_api_health(page): + resp = page.request.get(f"{BASE_URL}/health") + assert resp.ok + assert resp.json()["status"] == "ok" + + +# ─── Dark Mode ─────────────────────────────────────────────────────── + +def test_dark_mode_toggle(page): + page.goto(BASE_URL) + page.wait_for_timeout(2000) + # Initially light + theme = page.evaluate("document.documentElement.getAttribute('data-theme')") + assert theme != "dark" + # Toggle via JS (simulates button) + page.evaluate("document.documentElement.setAttribute('data-theme', 'dark')") + theme = page.evaluate("document.documentElement.getAttribute('data-theme')") + assert theme == "dark" + + +# ─── Accessibility ─────────────────────────────────────────────────── + +def test_search_has_aria_label(page): + page.goto(BASE_URL) + page.wait_for_timeout(2000) + label = page.locator("#search-input").get_attribute("aria-label") + assert label, "Suchfeld hat kein aria-label" + + +def test_hamburger_has_aria_label(page): + page.goto(BASE_URL) + page.wait_for_timeout(2000) + btn = page.locator("button[aria-label='Menü öffnen']") + assert btn.count() == 1, "Hamburger-Button hat kein aria-label" + + +def test_focus_visible_indicator(page): + page.goto(BASE_URL) + page.wait_for_timeout(2000) + # Check that :focus-visible style exists + has_focus = page.evaluate("""() => { + const rules = [...document.styleSheets[0].cssRules]; + return rules.some(r => r.selectorText && r.selectorText.includes('focus-visible')); + }""") + assert has_focus, "Kein :focus-visible CSS-Regel gefunden" diff --git a/tests/test_abgeordnetenwatch.py b/tests/test_abgeordnetenwatch.py new file mode 100644 index 0000000..145e51e --- /dev/null +++ b/tests/test_abgeordnetenwatch.py @@ -0,0 +1,405 @@ +"""Tests für app/abgeordnetenwatch.py und die DB-Funktionen (#106 Phase 1).""" + +from __future__ import annotations + +import asyncio +import json +import sys +import types + +import pytest + + +# ─── aiosqlite-Stub-Schutz (analog test_database.py) ──────────────────────── +_aio = sys.modules.get("aiosqlite") +if _aio is not None and not hasattr(_aio, "connect"): + del sys.modules["aiosqlite"] + +import aiosqlite as _real_aiosqlite # noqa: E402 +import importlib as _importlib + +for _mod in ("app.database", "app.abgeordnetenwatch"): + if _mod in sys.modules: + _db_mod = sys.modules[_mod] + if _mod == "app.database" and not hasattr( + getattr(_db_mod, "aiosqlite", None), "connect" + ): + del sys.modules[_mod] + _importlib.import_module(_mod) + else: + _importlib.import_module(_mod) + + +def run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +# ─── Fixtures ──────────────────────────────────────────────────────────────── + +@pytest.fixture() +def db_path(tmp_path, monkeypatch): + path = tmp_path / "test_aw.db" + from app.config import settings + monkeypatch.setattr(settings, "db_path", str(path)) + return str(path) + + +@pytest.fixture() +def initialized_db(db_path): + from app import database + run(database.init_db()) + return db_path + + +# ─── extract_drucksache_from_intro ─────────────────────────────────────────── + +class TestExtractDrucksache: + def test_simple_match(self): + from app.abgeordnetenwatch import extract_drucksache_from_intro + html = "

Beratung des Antrags 18/1234 der Fraktion SPD.

" + assert extract_drucksache_from_intro(html) == "18/1234" + + def test_match_in_link(self): + from app.abgeordnetenwatch import extract_drucksache_from_intro + html = 'Drucksache 7/98765' + assert extract_drucksache_from_intro(html) == "7/98765" + + def test_first_match_wins(self): + from app.abgeordnetenwatch import extract_drucksache_from_intro + html = "Antrag 17/100 und 18/200" + assert extract_drucksache_from_intro(html) == "17/100" + + def test_no_match_returns_none(self): + from app.abgeordnetenwatch import extract_drucksache_from_intro + html = "

Kein Bezug auf eine Drucksache hier.

" + assert extract_drucksache_from_intro(html) is None + + def test_empty_string_returns_none(self): + from app.abgeordnetenwatch import extract_drucksache_from_intro + assert extract_drucksache_from_intro("") is None + + def test_none_input_returns_none(self): + from app.abgeordnetenwatch import extract_drucksache_from_intro + assert extract_drucksache_from_intro(None) is None + + def test_too_short_sequence_not_matched(self): + """Zwei Ziffern reichen nicht aus (min. 3 nach Slash).""" + from app.abgeordnetenwatch import extract_drucksache_from_intro + html = "Seite 3/12 — nicht relevant" + assert extract_drucksache_from_intro(html) is None + + def test_two_digit_wp_number(self): + from app.abgeordnetenwatch import extract_drucksache_from_intro + html = "Bezug: 19/12345" + assert extract_drucksache_from_intro(html) == "19/12345" + + +# ─── PARLIAMENT_ID-Mapping ──────────────────────────────────────────────────── + +class TestParliamentIdMapping: + def test_bt_maps_to_5(self): + from app.abgeordnetenwatch import PARLIAMENT_ID + assert PARLIAMENT_ID["BT"] == 5 + + def test_bund_alias_maps_to_5(self): + from app.abgeordnetenwatch import PARLIAMENT_ID + assert PARLIAMENT_ID["BUND"] == 5 + + def test_nrw_maps_to_4(self): + from app.abgeordnetenwatch import PARLIAMENT_ID + assert PARLIAMENT_ID["NRW"] == 4 + + def test_all_16_bundeslaender_plus_bt_present(self): + from app.abgeordnetenwatch import PARLIAMENT_ID + expected_codes = { + "BT", "NRW", "BE", "HH", "BW", "RP", "LSA", "MV", + "HB", "HE", "NI", "BY", "SL", "TH", "BB", "SN", "SH", + } + assert expected_codes <= set(PARLIAMENT_ID.keys()) + + +# ─── fetch_polls — Stub-Test via httpx mock ─────────────────────────────────── + +class TestFetchPollsStub: + def test_fetch_polls_parses_response(self, monkeypatch): + """Stub-Test: httpx.AsyncClient.get gibt eine Fake-API-Antwort zurück.""" + import httpx + from app import abgeordnetenwatch as aw_mod + + fake_polls = [ + { + "id": 42, + "label": "Testabstimmung 1", + "field_poll_date": "2026-04-01", + "field_accepted": True, + "field_topics": [{"label": "Klimaschutz"}], + "field_intro": "

Antrag 18/999 der Fraktion GRÜNE

", + "field_legislature": {"label": "18. Wahlperiode"}, + } + ] + + class FakeResponse: + def raise_for_status(self): pass + def json(self): return {"data": fake_polls} + + class FakeClient: + async def __aenter__(self): return self + async def __aexit__(self, *a): pass + async def get(self, url, params=None): return FakeResponse() + + monkeypatch.setattr(httpx, "AsyncClient", lambda **kw: FakeClient()) + + polls = run(aw_mod.fetch_polls("NRW", limit=10)) + assert len(polls) == 1 + assert polls[0]["id"] == 42 + assert polls[0]["drucksache"] == "18/999" + assert polls[0]["field_accepted"] is True + + def test_fetch_polls_unknown_bundesland_raises(self): + from app.abgeordnetenwatch import fetch_polls + with pytest.raises(ValueError, match="Unbekannter BL-Code"): + run(fetch_polls("XX")) + + def test_fetch_votes_parses_response(self, monkeypatch): + """Stub-Test: Votes werden korrekt geparst.""" + import httpx + from app import abgeordnetenwatch as aw_mod + + fake_votes = [ + { + "mandate": { + "id": 101, + "label": "Erika Mustermann", + "party": {"label": "SPD"}, + }, + "vote": "yes", + }, + { + "mandate": { + "id": 102, + "label": "Max Muster", + "party": {"label": "CDU"}, + }, + "vote": "no", + }, + ] + + class FakeResponse: + def raise_for_status(self): pass + def json(self): return {"data": fake_votes} + + class FakeClient: + async def __aenter__(self): return self + async def __aexit__(self, *a): pass + async def get(self, url, params=None): return FakeResponse() + + monkeypatch.setattr(httpx, "AsyncClient", lambda **kw: FakeClient()) + + votes = run(aw_mod.fetch_votes_for_poll(42)) + assert len(votes) == 2 + assert votes[0]["politician_id"] == 101 + assert votes[0]["vote"] == "yes" + assert votes[0]["partei"] == "SPD" + assert votes[1]["vote"] == "no" + + def test_fetch_votes_unknown_vote_value_becomes_no_show(self, monkeypatch): + import httpx + from app import abgeordnetenwatch as aw_mod + + fake_votes = [{"mandate": {"id": 1, "label": "X"}, "vote": "gibberish"}] + + class FakeResponse: + def raise_for_status(self): pass + def json(self): return {"data": fake_votes} + + class FakeClient: + async def __aenter__(self): return self + async def __aexit__(self, *a): pass + async def get(self, url, params=None): return FakeResponse() + + monkeypatch.setattr(httpx, "AsyncClient", lambda **kw: FakeClient()) + + votes = run(aw_mod.fetch_votes_for_poll(99)) + assert votes[0]["vote"] == "no_show" + + +# ─── DB-Upsert-Round-Trip ───────────────────────────────────────────────────── + +class TestDbUpsertRoundTrip: + def test_upsert_poll_new(self, initialized_db): + from app import database + is_new = run(database.upsert_aw_poll( + poll_id=1001, + parliament_id=4, + bundesland="NRW", + drucksache="18/500", + titel="Test-Abstimmung", + datum="2026-04-01", + accepted=True, + topics=["Klimaschutz"], + legislature_label="18. Wahlperiode", + synced_at="2026-04-20T10:00:00", + )) + assert is_new is True + + def test_upsert_poll_update_returns_false(self, initialized_db): + from app import database + run(database.upsert_aw_poll( + poll_id=1002, parliament_id=4, bundesland="NRW", + drucksache="18/501", titel="Alt", datum="2026-03-01", + accepted=False, topics=[], legislature_label="", + synced_at="2026-04-20T10:00:00", + )) + is_new = run(database.upsert_aw_poll( + poll_id=1002, parliament_id=4, bundesland="NRW", + drucksache="18/501", titel="Neu", datum="2026-03-01", + accepted=True, topics=[], legislature_label="", + synced_at="2026-04-20T11:00:00", + )) + assert is_new is False + + def test_upsert_vote_new(self, initialized_db): + from app import database + run(database.upsert_aw_poll( + poll_id=2001, parliament_id=4, bundesland="NRW", + drucksache=None, titel="V-Test", datum="2026-04-01", + accepted=True, topics=[], legislature_label="", + synced_at="2026-04-20T10:00:00", + )) + is_new = run(database.upsert_aw_vote( + poll_id=2001, politician_id=999, + politician_name="Test Politiker", + partei="SPD", + vote="yes", + )) + assert is_new is True + + def test_upsert_vote_update_returns_false(self, initialized_db): + from app import database + run(database.upsert_aw_poll( + poll_id=2002, parliament_id=4, bundesland="NRW", + drucksache=None, titel="V-Test2", datum="2026-04-01", + accepted=True, topics=[], legislature_label="", + synced_at="2026-04-20T10:00:00", + )) + run(database.upsert_aw_vote(2002, 888, "Name", "CDU", "no")) + is_new = run(database.upsert_aw_vote(2002, 888, "Name", "CDU", "yes")) + assert is_new is False + + def test_get_abstimmungsverhalten_returns_none_for_missing(self, initialized_db): + from app import database + result = run(database.get_abstimmungsverhalten("99/9999")) + assert result is None + + def test_get_abstimmungsverhalten_aggregates(self, initialized_db): + from app import database + # Poll anlegen + run(database.upsert_aw_poll( + poll_id=3001, parliament_id=4, bundesland="NRW", + drucksache="18/3001", titel="AggTest", datum="2026-04-10", + accepted=True, topics=[], legislature_label="", + synced_at="2026-04-20T10:00:00", + )) + # Votes: 2× SPD yes, 1× CDU no, 1× CDU abstain + run(database.upsert_aw_vote(3001, 1, "A", "SPD", "yes")) + run(database.upsert_aw_vote(3001, 2, "B", "SPD", "yes")) + run(database.upsert_aw_vote(3001, 3, "C", "CDU", "no")) + run(database.upsert_aw_vote(3001, 4, "D", "CDU", "abstain")) + + result = run(database.get_abstimmungsverhalten("18/3001")) + assert result is not None + assert result["accepted"] is True + fraktionen = {f["partei"]: f for f in result["fraktionen"]} + assert fraktionen["SPD"]["yes"] == 2 + assert fraktionen["CDU"]["no"] == 1 + assert fraktionen["CDU"]["abstain"] == 1 + + def test_aw_tables_created_by_init_db(self, db_path): + """init_db legt abgeordnetenwatch_polls und _votes an.""" + import aiosqlite + from app import database + run(database.init_db()) + + async def check(): + async with aiosqlite.connect(db_path) as db: + cur = await db.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ) + return {r[0] for r in await cur.fetchall()} + + tables = run(check()) + assert "abgeordnetenwatch_polls" in tables + assert "abgeordnetenwatch_votes" in tables + + +# ─── fallback_drucksache_by_date_title (#142 Phase 3) ──────────────────────── + +class TestFallbackDrucksacheByDateTitle: + """Unit-Tests für den Datum+Titel-Fallback-Lookup gegen die Assessments-DB.""" + + @pytest.fixture() + def db_with_assessment(self, initialized_db): + """DB mit einem vorbereiteten Assessment für den Fallback-Test.""" + from app import database + run(database.upsert_assessment({ + "drucksache": "7/1234", + "title": "Antrag zur Verbesserung des Nahverkehrs in MV", + "bundesland": "MV", + "datum": "2026-04-10", + "fraktionen": ["SPD"], + "gwoeScore": 7.0, + "gwoeBegründung": "Test", + "gwoeMatrix": [], + "gwoeSchwerpunkt": [], + "wahlprogrammScores": [], + "verbesserungen": [], + "stärken": [], + "schwächen": [], + "themen": [], + "antragZusammenfassung": "", + "antragKernpunkte": [], + "source": "batch", + "model": "test", + })) + return initialized_db + + def test_fallback_finds_match_within_14_days(self, db_with_assessment): + from app.abgeordnetenwatch import fallback_drucksache_by_date_title + result = run(fallback_drucksache_by_date_title( + datum="2026-04-12", # 2 Tage nach Assessment-Datum + titel="Verbesserung des Nahverkehrs in MV", + bundesland="MV", + )) + assert result == "7/1234" + + def test_fallback_returns_none_outside_14_days(self, db_with_assessment): + from app.abgeordnetenwatch import fallback_drucksache_by_date_title + result = run(fallback_drucksache_by_date_title( + datum="2026-05-01", # 21 Tage nach Assessment-Datum + titel="Verbesserung des Nahverkehrs in MV", + bundesland="MV", + )) + assert result is None + + def test_fallback_returns_none_wrong_bundesland(self, db_with_assessment): + from app.abgeordnetenwatch import fallback_drucksache_by_date_title + result = run(fallback_drucksache_by_date_title( + datum="2026-04-10", + titel="Verbesserung des Nahverkehrs in MV", + bundesland="BY", # falsches BL + )) + assert result is None + + def test_fallback_returns_none_no_titel_match(self, db_with_assessment): + from app.abgeordnetenwatch import fallback_drucksache_by_date_title + result = run(fallback_drucksache_by_date_title( + datum="2026-04-10", + titel="Irgendwas voellig anderes ohne Treffer", + bundesland="MV", + )) + assert result is None + + def test_fallback_returns_none_for_missing_inputs(self, db_with_assessment): + from app.abgeordnetenwatch import fallback_drucksache_by_date_title + assert run(fallback_drucksache_by_date_title(None, "Titel", "MV")) is None + assert run(fallback_drucksache_by_date_title("2026-04-10", None, "MV")) is None diff --git a/tests/test_abonnement_repository.py b/tests/test_abonnement_repository.py new file mode 100644 index 0000000..22f3380 --- /dev/null +++ b/tests/test_abonnement_repository.py @@ -0,0 +1,95 @@ +"""Tests für AbonnementRepository (#136, ADR 0008).""" +from __future__ import annotations + +import asyncio + +from app.repositories import ( + AbonnementRepository, + InMemoryAbonnementRepository, + SqliteAbonnementRepository, +) + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +class TestProtocolConformance: + def test_in_memory_implements_protocol(self): + repo = InMemoryAbonnementRepository() + assert isinstance(repo, AbonnementRepository) + + def test_sqlite_implements_protocol(self): + repo = SqliteAbonnementRepository() + assert isinstance(repo, AbonnementRepository) + + +class TestCreateAndList: + def test_create_returns_id(self): + repo = InMemoryAbonnementRepository() + sid = _run(repo.create("u1", "a@b.de")) + assert sid == 1 + + def test_create_increments_id(self): + repo = InMemoryAbonnementRepository() + s1 = _run(repo.create("u1", "a@b.de")) + s2 = _run(repo.create("u1", "c@d.de")) + assert s2 == s1 + 1 + + def test_list_by_user_filters(self): + repo = InMemoryAbonnementRepository() + _run(repo.create("u1", "a@b.de")) + _run(repo.create("u2", "c@d.de")) + rows = _run(repo.list_by_user("u1")) + assert len(rows) == 1 + assert rows[0]["email"] == "a@b.de" + + def test_list_all_returns_every_sub(self): + repo = InMemoryAbonnementRepository() + _run(repo.create("u1", "a@b.de")) + _run(repo.create("u2", "c@d.de")) + assert len(_run(repo.list_all())) == 2 + + +class TestListDue: + def test_list_due_returns_unsent(self): + repo = InMemoryAbonnementRepository() + _run(repo.create("u1", "a@b.de")) + due = _run(repo.list_due()) + assert len(due) == 1 + + def test_mark_sent_removes_from_due(self): + repo = InMemoryAbonnementRepository() + sid = _run(repo.create("u1", "a@b.de")) + _run(repo.mark_sent(sid)) + assert _run(repo.list_due()) == [] + + def test_list_due_filters_by_frequency(self): + repo = InMemoryAbonnementRepository() + _run(repo.create("u1", "a@b.de", frequency="daily")) + _run(repo.create("u2", "c@d.de", frequency="weekly")) + daily = _run(repo.list_due("daily")) + weekly = _run(repo.list_due("weekly")) + assert len(daily) == 1 and daily[0]["email"] == "a@b.de" + assert len(weekly) == 1 and weekly[0]["email"] == "c@d.de" + + +class TestDelete: + def test_delete_checks_ownership(self): + repo = InMemoryAbonnementRepository() + sid = _run(repo.create("u1", "a@b.de")) + # Fremder User kann nicht löschen + assert _run(repo.delete("u2", sid)) is False + # Eigentümer kann löschen + assert _run(repo.delete("u1", sid)) is True + + def test_delete_by_id_skips_ownership_check(self): + repo = InMemoryAbonnementRepository() + sid = _run(repo.create("u1", "a@b.de")) + # delete_by_id ist für Unsubscribe-Links (Token-gesichert), nicht für + # den Self-Service; kein User-Check + assert _run(repo.delete_by_id(sid)) is True + + def test_delete_by_id_missing_returns_false(self): + repo = InMemoryAbonnementRepository() + assert _run(repo.delete_by_id(999)) is False diff --git a/tests/test_antrag_repository.py b/tests/test_antrag_repository.py new file mode 100644 index 0000000..4a8d81f --- /dev/null +++ b/tests/test_antrag_repository.py @@ -0,0 +1,158 @@ +"""Tests für AntragRepository (#136, ADR 0008). + +Das Protocol definiert den Vertrag — beide Implementationen +(``Sqlite*`` und ``InMemory*``) müssen sich daran halten. Die +SqliteAntragRepository-Implementation wird hier nur gegen die +Protocol-Konformität geprüft, nicht gegen echte DB-I/O; dafür sind +die bestehenden DB-Tests zuständig. +""" +from __future__ import annotations + +import asyncio + +import pytest + +from app.repositories import ( + AntragRepository, + InMemoryAntragRepository, + SqliteAntragRepository, +) + + +# ─── Protocol-Konformität ─────────────────────────────────────────────────── + +class TestProtocolConformance: + def test_in_memory_implements_protocol(self): + repo = InMemoryAntragRepository() + assert isinstance(repo, AntragRepository) + + def test_sqlite_implements_protocol(self): + repo = SqliteAntragRepository() + assert isinstance(repo, AntragRepository) + + +# ─── InMemoryAntragRepository — Vertrag ───────────────────────────────────── + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +def _make_assessment(drucksache: str = "18/1", bundesland: str = "NRW", + gwoe_score: float = 5.0, title: str = "Test-Antrag", + fraktionen=None, themen=None) -> dict: + return { + "drucksache": drucksache, + "title": title, + "bundesland": bundesland, + "gwoe_score": gwoe_score, + "fraktionen": fraktionen or ["SPD"], + "themen": themen or ["Bildung"], + } + + +class TestInMemoryRepoSaveAndGet: + def test_save_and_get_round_trip(self): + repo = InMemoryAntragRepository() + a = _make_assessment(drucksache="18/42", gwoe_score=7.5) + assert _run(repo.save(a)) is True + stored = _run(repo.get("18/42")) + assert stored is not None + assert stored["gwoe_score"] == 7.5 + + def test_get_returns_none_for_missing(self): + repo = InMemoryAntragRepository() + assert _run(repo.get("18/999")) is None + + def test_save_requires_drucksache(self): + repo = InMemoryAntragRepository() + with pytest.raises(ValueError): + _run(repo.save({"title": "oh no"})) + + def test_save_twice_overwrites_last_wins(self): + """UPSERT-Semantik: zweites save überschreibt das erste — wie in SQL.""" + repo = InMemoryAntragRepository() + _run(repo.save(_make_assessment("18/1", gwoe_score=3.0))) + _run(repo.save(_make_assessment("18/1", gwoe_score=8.0))) + stored = _run(repo.get("18/1")) + assert stored["gwoe_score"] == 8.0 + + def test_get_returns_independent_copy(self): + """Mutation des zurückgegebenen Dicts darf den Store nicht verändern.""" + repo = InMemoryAntragRepository() + _run(repo.save(_make_assessment("18/1", gwoe_score=5.0))) + r1 = _run(repo.get("18/1")) + r1["gwoe_score"] = 999.0 + r2 = _run(repo.get("18/1")) + assert r2["gwoe_score"] == 5.0 + + +class TestInMemoryRepoList: + def test_list_empty(self): + repo = InMemoryAntragRepository() + assert _run(repo.list()) == [] + + def test_list_sorted_by_gwoe_score_desc(self): + repo = InMemoryAntragRepository() + _run(repo.save(_make_assessment("18/1", gwoe_score=3.0))) + _run(repo.save(_make_assessment("18/2", gwoe_score=9.0))) + _run(repo.save(_make_assessment("18/3", gwoe_score=6.0))) + rows = _run(repo.list()) + assert [r["drucksache"] for r in rows] == ["18/2", "18/3", "18/1"] + + def test_list_filter_by_bundesland(self): + repo = InMemoryAntragRepository() + _run(repo.save(_make_assessment("18/1", bundesland="NRW"))) + _run(repo.save(_make_assessment("18/2", bundesland="HE"))) + rows = _run(repo.list(bundesland="HE")) + assert len(rows) == 1 + assert rows[0]["bundesland"] == "HE" + + def test_list_all_pseudo_bundesland_is_noop(self): + repo = InMemoryAntragRepository() + _run(repo.save(_make_assessment("18/1", bundesland="NRW"))) + _run(repo.save(_make_assessment("18/2", bundesland="HE"))) + assert len(_run(repo.list(bundesland="ALL"))) == 2 + + +class TestInMemoryRepoSearch: + def test_search_by_title(self): + repo = InMemoryAntragRepository() + _run(repo.save(_make_assessment("18/1", title="Klimaschutz für alle"))) + _run(repo.save(_make_assessment("18/2", title="Steuerreform"))) + rows = _run(repo.search("Klima")) + assert len(rows) == 1 + assert rows[0]["drucksache"] == "18/1" + + def test_search_by_themen(self): + repo = InMemoryAntragRepository() + _run(repo.save(_make_assessment("18/1", themen=["Verkehr"]))) + _run(repo.save(_make_assessment("18/2", themen=["Bildung"]))) + rows = _run(repo.search("bildung")) + assert len(rows) == 1 + assert rows[0]["drucksache"] == "18/2" + + def test_search_respects_limit(self): + repo = InMemoryAntragRepository() + for i in range(10): + _run(repo.save(_make_assessment(f"18/{i}", title="Klimaschutz", gwoe_score=i))) + rows = _run(repo.search("Klimaschutz", limit=3)) + assert len(rows) == 3 + + +class TestInMemoryRepoDelete: + def test_delete_existing(self): + repo = InMemoryAntragRepository() + _run(repo.save(_make_assessment("18/1"))) + assert _run(repo.delete("18/1")) is True + assert _run(repo.get("18/1")) is None + + def test_delete_missing_returns_false(self): + repo = InMemoryAntragRepository() + assert _run(repo.delete("18/999")) is False + + +class TestInitialSeed: + def test_initial_seed_fills_store(self): + seed = [_make_assessment("18/1"), _make_assessment("18/2")] + repo = InMemoryAntragRepository(initial=seed) + assert len(_run(repo.list())) == 2 diff --git a/tests/test_auth.py b/tests/test_auth.py index aa22bbc..90a4358 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,8 +1,9 @@ -"""Tests for app/auth.py — Keycloak JWT authentication (#43). +"""Tests for app/auth.py — Keycloak JWT authentication (#43, #129). These tests cover the auth module WITHOUT a running Keycloak server: - Token extraction from headers/cookies - Auth-disabled detection (Dev-Modus) +- direct_login — Keycloak Direct Access Grant (gemockt via httpx) - _pick_best_title helper (in main.py, tested here for convenience) """ import sys @@ -21,7 +22,9 @@ if "jose" not in sys.modules: sys.modules["jose.jwt"] = jose_jwt import pytest -from unittest.mock import MagicMock +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch +from fastapi import HTTPException from app.auth import _extract_token, _is_auth_enabled @@ -74,6 +77,110 @@ class TestIsAuthEnabled: assert _is_auth_enabled() is True +class TestDirectLogin: + """Tests für direct_login() in auth.py — Keycloak Direct Access Grant (#129). + + Alle Keycloak-HTTP-Calls werden via unittest.mock.patch gemockt. + Kein laufender Keycloak-Server nötig. + """ + + def _run(self, coro): + return asyncio.get_event_loop().run_until_complete(coro) + + def _make_resp(self, status_code: int, body: dict): + resp = MagicMock() + resp.status_code = status_code + resp.json.return_value = body + return resp + + def test_success_returns_token_data(self, monkeypatch): + """Bei 200 von Keycloak gibt direct_login das Token-Dict zurück.""" + from app import config + monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test") + monkeypatch.setattr(config.settings, "keycloak_realm", "testrealm") + monkeypatch.setattr(config.settings, "keycloak_client_id", "testclient") + + token_response = { + "access_token": "eyABC", + "refresh_token": "ryDEF", + "expires_in": 300, + "refresh_expires_in": 1800, + } + mock_resp = self._make_resp(200, token_response) + + async def _mock_post(*args, **kwargs): + return mock_resp + + mock_client = AsyncMock() + mock_client.post = _mock_post + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + + with patch("app.auth.httpx.AsyncClient", return_value=mock_client): + from app.auth import direct_login + result = self._run(direct_login("user", "pw")) + + assert result["access_token"] == "eyABC" + assert result["refresh_token"] == "ryDEF" + assert result["expires_in"] == 300 + + def test_invalid_credentials_raises_401(self, monkeypatch): + """Bei 401 von Keycloak wirft direct_login HTTPException(status_code=401).""" + from app import config + monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test") + monkeypatch.setattr(config.settings, "keycloak_realm", "testrealm") + monkeypatch.setattr(config.settings, "keycloak_client_id", "testclient") + + mock_resp = self._make_resp(401, {"error": "invalid_grant", "error_description": "Ungültige Anmeldedaten"}) + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_resp) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + + with patch("app.auth.httpx.AsyncClient", return_value=mock_client): + from app.auth import direct_login + with pytest.raises(HTTPException) as exc_info: + self._run(direct_login("user", "falsch")) + + assert exc_info.value.status_code == 401 + assert "Ungültige Anmeldedaten" in exc_info.value.detail + + def test_keycloak_error_raises_non_401(self, monkeypatch): + """Bei 500 von Keycloak wirft direct_login HTTPException mit dem Keycloak-Statuscode.""" + from app import config + monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test") + monkeypatch.setattr(config.settings, "keycloak_realm", "testrealm") + monkeypatch.setattr(config.settings, "keycloak_client_id", "testclient") + + mock_resp = self._make_resp(500, {"error_description": "Internal Server Error"}) + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_resp) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + + with patch("app.auth.httpx.AsyncClient", return_value=mock_client): + from app.auth import direct_login + with pytest.raises(HTTPException) as exc_info: + self._run(direct_login("user", "pw")) + + assert exc_info.value.status_code == 500 + + def test_auth_disabled_raises_400(self, monkeypatch): + """Wenn Auth nicht konfiguriert ist, wirft direct_login HTTPException(400).""" + from app import config + monkeypatch.setattr(config.settings, "keycloak_url", "") + monkeypatch.setattr(config.settings, "keycloak_realm", "") + monkeypatch.setattr(config.settings, "keycloak_client_id", "") + + from app.auth import direct_login + with pytest.raises(HTTPException) as exc_info: + self._run(direct_login("user", "pw")) + + assert exc_info.value.status_code == 400 + + try: from app.main import _pick_best_title _HAS_MAIN = True diff --git a/tests/test_bewertung_repository.py b/tests/test_bewertung_repository.py new file mode 100644 index 0000000..364ba67 --- /dev/null +++ b/tests/test_bewertung_repository.py @@ -0,0 +1,49 @@ +"""Tests für BewertungRepository — Assessment-Versionshistorie (#136, ADR 0008).""" +from __future__ import annotations + +import asyncio + +from app.repositories import ( + BewertungRepository, + InMemoryBewertungRepository, + SqliteBewertungRepository, +) + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +class TestProtocolConformance: + def test_in_memory_implements_protocol(self): + repo = InMemoryBewertungRepository() + assert isinstance(repo, BewertungRepository) + + def test_sqlite_implements_protocol(self): + repo = SqliteBewertungRepository() + assert isinstance(repo, BewertungRepository) + + +class TestVersionHistory: + def test_empty_history_for_unknown_drucksache(self): + repo = InMemoryBewertungRepository() + assert _run(repo.versions("18/1")) == [] + + def test_versions_sorted_newest_first(self): + """Contract: neueste Version zuerst — wie bei + database.get_assessment_history (ORDER BY version DESC).""" + repo = InMemoryBewertungRepository() + repo.add_version("18/1", version=1, gwoe_score=4.0, model="qwen-plus") + repo.add_version("18/1", version=2, gwoe_score=7.0, model="qwen-plus") + repo.add_version("18/1", version=3, gwoe_score=6.5, model="qwen-plus") + rows = _run(repo.versions("18/1")) + assert [r["version"] for r in rows] == [3, 2, 1] + + def test_versions_filter_by_drucksache(self): + repo = InMemoryBewertungRepository() + repo.add_version("18/1", version=1, gwoe_score=5.0, model="qwen-plus") + repo.add_version("18/2", version=1, gwoe_score=8.0, model="qwen-plus") + rows_a = _run(repo.versions("18/1")) + rows_b = _run(repo.versions("18/2")) + assert len(rows_a) == 1 and rows_a[0]["gwoe_score"] == 5.0 + assert len(rows_b) == 1 and rows_b[0]["gwoe_score"] == 8.0 diff --git a/tests/test_bug_regressions.py b/tests/test_bug_regressions.py new file mode 100644 index 0000000..9147811 --- /dev/null +++ b/tests/test_bug_regressions.py @@ -0,0 +1,408 @@ +"""Bug-Regression-Tests für fünf Fix-Commits ohne bisherige Test-Coverage. + +Je ein Mini-Integration-Test anchored am konkreten Fehler: + 3e71547 — PRAGMA cursor fetchall() vor iteration in SQLite + 49c1b92 — JWT azp statt aud bei Keycloak Public Clients + 1414057 — CDU-PDF AssertionError Fallback in render_highlighted_page + 5ea507b — PFLICHT-FRAKTIONEN = alle LT-Fraktionen + 038ebd6 — NRW-Titel + Regierungsfraktionen-Pflicht im LLM-Prompt +""" + +import asyncio +import json +import sqlite3 +import sys +import types + +import pytest + +# --------------------------------------------------------------------------- +# Stubs für externe Deps, die in der lokalen Dev-Umgebung nicht vollständig +# installiert sind (openai AsyncOpenAI, aiosmtplib, etc.) +# --------------------------------------------------------------------------- +if "openai" not in sys.modules or not hasattr(sys.modules.get("openai"), "AsyncOpenAI"): + openai_stub = types.ModuleType("openai") + openai_stub.OpenAI = lambda **kw: None + openai_stub.AsyncOpenAI = lambda **kw: None + sys.modules["openai"] = openai_stub + + +# =========================================================================== +# Bug 1 — PRAGMA cursor fetchall() vor Iteration (Commit 3e71547) +# =========================================================================== +# Vor dem Fix wurde `conn.execute("PRAGMA table_info(...)")` direkt iteriert, +# ohne fetchall() aufzurufen. Bei aiosqlite führt das zu einer Exception oder +# leeren Ergebnissen. Der Fix: `cursor.fetchall()` vor dem Set-Comprehension. +# Regression: init_db() muss die PRAGMA-Rows korrekt auslesen — fehlt die +# Spalte, wird ein ALTER TABLE versucht; ist sie da, wird nichts gemacht. + +class TestPragmaCursorFetchall: + def test_table_info_fetchall_reads_columns(self, tmp_path): + """PRAGMA table_info liefert Spaltennamen korrekt via fetchall().""" + db = tmp_path / "test.db" + conn = sqlite3.connect(str(db)) + conn.execute("CREATE TABLE assessments (drucksache TEXT, konfidenz TEXT, summary_embedding BLOB)") + conn.commit() + + # Exakt wie in database.py nach dem Fix: execute().fetchall() + cursor = conn.execute("PRAGMA table_info(assessments)") + cols = {r[1] for r in cursor.fetchall()} + conn.close() + + assert "konfidenz" in cols + assert "summary_embedding" in cols + assert "drucksache" in cols + + def test_fetchall_before_set_comprehension_no_crash(self, tmp_path): + """Direktes Iterieren über cursor ohne fetchall() — Regression-Guard. + + Früher wurde `for r in cursor` statt `for r in cursor.fetchall()` verwendet. + Der Test stellt sicher, dass fetchall() explizit aufgerufen wird und + die Ergebnisliste nicht leer ist. + """ + db = tmp_path / "test2.db" + conn = sqlite3.connect(str(db)) + conn.execute("CREATE TABLE jobs (id TEXT, drucksache TEXT, status TEXT)") + conn.commit() + + cursor = conn.execute("PRAGMA table_info(jobs)") + rows = cursor.fetchall() # Fix: fetchall() vor Iteration + cols = {r[1] for r in rows} + conn.close() + + assert len(cols) == 3 + assert "drucksache" in cols + + def test_init_db_does_not_crash_on_existing_db(self, tmp_path, monkeypatch): + """init_db() läuft auf einer bereits initialisierten DB durch (kein PRAGMA-Crash).""" + from app import config + monkeypatch.setattr(config.settings, "db_path", tmp_path / "gwoe.db") + + from app.database import init_db + asyncio.get_event_loop().run_until_complete(init_db()) + # Zweiter Aufruf — alle ALTER-TABLE-Checks laufen durch + asyncio.get_event_loop().run_until_complete(init_db()) + + +# =========================================================================== +# Bug 2 — JWT azp statt aud (Commit 49c1b92) +# =========================================================================== +# Keycloak setzt bei Public Clients aud="account", nicht den client_id. +# Der Fix: verify_aud=False, stattdessen payload["azp"] == client_id prüfen. + +class TestJwtAzpCheck: + """_validate_token prüft azp, nicht aud.""" + + def _run(self, coro): + return asyncio.get_event_loop().run_until_complete(coro) + + def _patch_jose(self, payload: dict): + """Stub jose.jwt so it returns the given payload on decode().""" + jose_mod = types.ModuleType("jose") + jose_jwt = types.ModuleType("jose.jwt") + jose_jwt.get_unverified_header = lambda t: {"kid": "test-kid", "alg": "RS256"} + jose_jwt.decode = lambda token, key, **kw: payload + jose_mod.jwt = jose_jwt + jose_mod.JWTError = Exception + jose_mod.ExpiredSignatureError = type("ExpiredSignatureError", (Exception,), {}) + sys.modules["jose"] = jose_mod + sys.modules["jose.jwt"] = jose_jwt + return jose_mod + + def test_valid_azp_returns_user(self, monkeypatch): + """Wenn azp == client_id, gibt _validate_token ein User-Dict zurück.""" + from app import config + monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test") + monkeypatch.setattr(config.settings, "keycloak_realm", "realm") + monkeypatch.setattr(config.settings, "keycloak_client_id", "my-client") + + payload = { + "sub": "user-123", + "email": "user@test.de", + "preferred_username": "testuser", + "azp": "my-client", # korrekt + "aud": "account", # Public Client — nicht unser client_id + "realm_access": {"roles": []}, + } + self._patch_jose(payload) + + jwks = {"keys": [{"kid": "test-kid", "kty": "RSA"}]} + + from app.auth import _validate_token + import app.auth as auth_mod + # Stub _get_jwks + async def _fake_jwks(): + return jwks + + monkeypatch.setattr(auth_mod, "_get_jwks", _fake_jwks) + result = self._run(_validate_token("fake.jwt.token")) + + assert result is not None + assert result["sub"] == "user-123" + + def test_wrong_azp_returns_none(self, monkeypatch): + """Wenn azp != client_id, gibt _validate_token None zurück (auch wenn aud passt).""" + from app import config + monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test") + monkeypatch.setattr(config.settings, "keycloak_realm", "realm") + monkeypatch.setattr(config.settings, "keycloak_client_id", "my-client") + + payload = { + "sub": "attacker-999", + "azp": "other-client", # falscher azp + "aud": "my-client", # aud passt zufällig — darf NICHT reichen + "realm_access": {"roles": []}, + } + self._patch_jose(payload) + + jwks = {"keys": [{"kid": "test-kid"}]} + + from app.auth import _validate_token + import app.auth as auth_mod + async def _fake_jwks(): + return jwks + monkeypatch.setattr(auth_mod, "_get_jwks", _fake_jwks) + + result = self._run(_validate_token("fake.jwt.token")) + assert result is None, "azp-Mismatch muss zu None führen" + + def test_verify_aud_is_disabled(self): + """Die Source von _validate_token muss options={'verify_aud': False} enthalten.""" + import inspect + from app.auth import _validate_token + source = inspect.getsource(_validate_token) + assert "verify_aud" in source and "False" in source, ( + "_validate_token muss verify_aud=False in den JWT-decode-Options setzen" + ) + + def test_azp_field_checked_not_aud(self): + """Die Source von _validate_token muss explizit 'azp' prüfen, nicht 'aud'.""" + import inspect + from app.auth import _validate_token + source = inspect.getsource(_validate_token) + assert 'payload.get("azp")' in source or "payload['azp']" in source, ( + "_validate_token muss payload['azp'] gegen client_id prüfen" + ) + + +# =========================================================================== +# Bug 3 — CDU-PDF AssertionError Fallback (Commit 1414057) +# =========================================================================== +# render_highlighted_page() von embeddings.py wirft bei manchen CDU-PDFs einen +# AssertionError in PyMuPDF tobytes(). Der Fix fängt (AssertionError, Exception) +# und gibt die ursprüngliche PDF-Datei zurück. + +class TestCduPdfAssertionFallback: + def test_tobytes_assertionerror_returns_original_pdf(self, tmp_path, monkeypatch): + """Wenn tobytes() AssertionError wirft, liefert render_highlighted_page das Original-PDF.""" + import app.embeddings as emb_mod + + # Minimales Fake-PDF (realer Inhalt nicht nötig, nur Bytes) + fake_pdf_bytes = b"%PDF-1.4 fakecontent" + pdf_path = tmp_path / "cdu-test.pdf" + pdf_path.write_bytes(fake_pdf_bytes) + + # Stub PROGRAMME registry + monkeypatch.setattr( + emb_mod, "PROGRAMME", + {"cdu-test": {"name": "CDU Test", "typ": "wahlprogramm", + "partei": "CDU", "pdf": "cdu-test.pdf"}}, + ) + + class FakePage: + def search_for(self, needle): + return [] + def add_highlight_annot(self, rect): + return None + + class FakeDoc: + def __init__(self, *a, **kw): + self._pages = [FakePage()] + def __len__(self): + return 1 + def __getitem__(self, idx): + return self._pages[idx] + def tobytes(self): + raise AssertionError("CDU-PDF kaputt — AssertionError aus PyMuPDF") + def close(self): + pass + + # fitz is a thin wrapper around pymupdf; patch the fitz.open used inside embeddings.py + import fitz + import pymupdf + # Patch both possible references + monkeypatch.setattr(fitz, "open", FakeDoc, raising=False) + monkeypatch.setattr(pymupdf, "open", FakeDoc, raising=False) + + # Redirect referenzen-Pfad zu tmp_path + from pathlib import Path as _Path + + original_truediv = _Path.__truediv__ + + def _redirect_truediv(self, other): + result = original_truediv(self, other) + if "referenzen" in str(result) and str(other) == "cdu-test.pdf": + return pdf_path + if "referenzen" in str(result): + return tmp_path + return result + + monkeypatch.setattr(_Path, "__truediv__", _redirect_truediv) + + pdf_bytes, found_page, highlighted = emb_mod.render_highlighted_page( + "cdu-test", seite=1, query="Wirtschaft" + ) + + assert pdf_bytes == fake_pdf_bytes, "Fallback muss Original-PDF-Bytes zurückgeben" + assert highlighted is False + + def test_assertion_error_fallback_present_in_source(self): + """render_highlighted_page muss AssertionError in einem try/except fangen.""" + import inspect + import app.embeddings as emb_mod + source = inspect.getsource(emb_mod.render_highlighted_page) + assert "AssertionError" in source, ( + "render_highlighted_page muss AssertionError explizit fangen (CDU-PDF-Fallback)" + ) + + +# =========================================================================== +# Bug 4 — PFLICHT-FRAKTIONEN = alle LT-Fraktionen (Commit 5ea507b) +# =========================================================================== +# Vor dem Fix wurden nur Antragsteller + Regierungsfraktionen als PFLICHT- +# FRAKTIONEN ausgegeben. Der Fix gibt alle landtagsfraktionen aus. + +class TestPflichtFraktionen: + def _build_user_prompt(self, bundesland: str = "NRW") -> str: + """Baut den user_prompt wie in analyzer.py — minimal, ohne LLM-Call.""" + from app.bundeslaender import BUNDESLAENDER + from app.analyzer import get_bundesland_context + + bl = BUNDESLAENDER[bundesland] + pflicht = ", ".join(bl.landtagsfraktionen) + return ( + f"**PFLICHT-FRAKTIONEN:** Du MUSST ALLE folgenden Fraktionen der " + f"aktuellen Wahlperiode in `wahlprogrammScores` bewerten — keine auslassen:\n" + f"{pflicht}" + ) + + def test_afd_in_pflicht_fraktionen_nrw(self): + """AfD muss in PFLICHT-FRAKTIONEN für NRW stehen, auch ohne Antragsteller.""" + prompt = self._build_user_prompt("NRW") + assert "AfD" in prompt, "AfD fehlt in PFLICHT-FRAKTIONEN (NRW)" + + def test_all_nrw_fraktionen_in_prompt(self): + """Alle NRW-Landtagsfraktionen müssen im PFLICHT-Block stehen.""" + from app.bundeslaender import BUNDESLAENDER + prompt = self._build_user_prompt("NRW") + for fraktion in BUNDESLAENDER["NRW"].landtagsfraktionen: + assert fraktion in prompt, f"Fraktion {fraktion!r} fehlt im PFLICHT-Block" + + def test_analyzer_user_prompt_contains_all_fraktionen(self, monkeypatch): + """analyze_antrag baut einen user_prompt mit allen LT-Fraktionen als PFLICHT.""" + # Nach ADR 0008: Wir reichen einen FakeLlmBewerter statt den + # AsyncOpenAI-Client zu monkeypatchen. Der Fake captured den + # user_prompt aus dem ``LlmRequest`` und liefert ein minimales + # gültiges Assessment-Dict zurück. + import app.analyzer as analyzer_mod + from app.bundeslaender import BUNDESLAENDER + + captured_prompts: list[str] = [] + + class FakeBewerter: + async def bewerte(self, request): + captured_prompts.append(request.user_prompt) + return { + "drucksache": "18/1", + "title": "Test", + "fraktionen": ["SPD"], + "datum": "2024-01-01", + "link": None, + "gwoeScore": 5, + "gwoeBegründung": "Test", + "gwoeMatrix": [], + "gwoeSchwerpunkt": [], + "wahlprogrammScores": [], + "verbesserungen": [], + "stärken": [], + "schwächen": [], + "empfehlung": "Überarbeiten", + "empfehlungSymbol": "[!]", + "verbesserungspotenzial": "mittel", + "themen": [], + "antragZusammenfassung": "Test", + "antragKernpunkte": [], + "konfidenz": "mittel", + "shareThreads": "", + "shareTwitter": "", + "shareMastodon": "", + } + + import app.embeddings as emb_mod + monkeypatch.setattr(emb_mod, "EMBEDDINGS_DB", type("P", (), {"exists": lambda self: False})()) + + asyncio.get_event_loop().run_until_complete( + analyzer_mod.analyze_antrag( + text="Der SPD-Antrag fordert mehr Klimaschutz in NRW.", + bundesland="NRW", + model="qwen-plus", + bewerter=FakeBewerter(), + ) + ) + + assert captured_prompts, "user_prompt muss gebaut worden sein" + prompt = captured_prompts[0] + + # AfD ist keine Regierungsfraktion in NRW — muss aber trotzdem stehen + assert "AfD" in prompt, "AfD fehlt im user_prompt (PFLICHT-FRAKTIONEN-Bug)" + # Alle NRW-Fraktionen prüfen + for fraktion in BUNDESLAENDER["NRW"].landtagsfraktionen: + assert fraktion in prompt, f"Fraktion {fraktion!r} fehlt im user_prompt" + + +# =========================================================================== +# Bug 5 — NRW-Titel + Regierungsfraktionen im LLM-Prompt (Commit 038ebd6) +# =========================================================================== +# get_bundesland_context() muss den Parlamentsnamen und die Regierungsfraktionen +# korrekt im Context-String ausgeben. + +class TestNrwTitelRegierungsfraktionen: + def test_bundesland_context_contains_regierungsfraktionen(self): + """get_bundesland_context gibt für NRW die aktuellen Regierungsfraktionen aus.""" + from app.analyzer import get_bundesland_context + from app.bundeslaender import BUNDESLAENDER + + ctx = get_bundesland_context("NRW") + + for regfrak in BUNDESLAENDER["NRW"].regierungsfraktionen: + assert regfrak in ctx, ( + f"Regierungsfraktion {regfrak!r} fehlt im Bundesland-Context für NRW" + ) + + def test_bundesland_context_contains_parliament_name(self): + """get_bundesland_context gibt den Parlamentsnamen aus.""" + from app.analyzer import get_bundesland_context + from app.bundeslaender import BUNDESLAENDER + + ctx = get_bundesland_context("NRW") + parliament = BUNDESLAENDER["NRW"].parlament_name + assert parliament in ctx, ( + f"Parlamentsname {parliament!r} fehlt im Context-String" + ) + + def test_bundesland_context_contains_landtagsfraktionen(self): + """get_bundesland_context listet alle LT-Fraktionen auf.""" + from app.analyzer import get_bundesland_context + from app.bundeslaender import BUNDESLAENDER + + ctx = get_bundesland_context("NRW") + for fraktion in BUNDESLAENDER["NRW"].landtagsfraktionen: + assert fraktion in ctx, ( + f"Landtagsfraktion {fraktion!r} fehlt im Bundesland-Context" + ) + + def test_regierungsfraktionen_label_present_in_context(self): + """Der Context-String enthält den Label 'Regierungsfraktionen'.""" + from app.analyzer import get_bundesland_context + ctx = get_bundesland_context("NRW") + assert "Regierungsfraktionen" in ctx diff --git a/tests/test_clustering.py b/tests/test_clustering.py new file mode 100644 index 0000000..1e72a3a --- /dev/null +++ b/tests/test_clustering.py @@ -0,0 +1,438 @@ +"""Unit-Tests für app/clustering.py (#134 Phase 2). + +Testet reine Python-Funktionen (_cosine, UnionFind, _cluster_indices, +_cluster_label, _dominant_fraktion, _cluster_summary) mit synthetischen +Fixtures. DB-abhängige async-Funktionen (load_assessment_items, +build_hierarchy, find_similar_assessments) werden mit gemocktem DB-Lader +getestet. + +Fixture-Corpus: normalisierte Vektoren per Pure-Python (kein numpy nötig). +""" +from __future__ import annotations + +import asyncio +import math +import random +from unittest.mock import patch + +import pytest + + +# ─── Hilfsfunktionen ───────────────────────────────────────────────────────── + +def run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +def _norm_py(v: list[float]) -> list[float]: + """Normalisiert einen Vektor auf Länge 1 (pure Python).""" + n = math.sqrt(sum(x * x for x in v)) + return [x / n for x in v] if n > 0 else v + + +def _make_items(n: int = 5, dim: int = 16, seed: int = 42) -> list[dict]: + """Erstellt n normalisierte Embedding-Dicts mit reproduzierbaren Zufallswerten.""" + rng = random.Random(seed) + items = [] + for i in range(n): + raw = [rng.gauss(0, 1) for _ in range(dim)] + items.append({ + "drucksache": f"18/{1000 + i}", + "title": f"Testantrag {i}", + "bundesland": "NRW", + "fraktionen": ["SPD"] if i % 2 == 0 else ["CDU"], + "datum": "2026-04-20", + "link": f"https://example.com/{i}", + "gwoe_score": 5.0 + i * 0.5, + "empfehlung": "Empfohlen", + "empfehlung_symbol": "✓", + "themen": [f"Thema{i % 3}"], + "embedding": _norm_py(raw), + }) + return items + + +# ─── _cosine ───────────────────────────────────────────────────────────────── + +class TestCosine: + def test_identical_vectors_give_one(self): + from app.clustering import _cosine + v = [1.0, 0.0, 0.0] + assert abs(_cosine(v, v) - 1.0) < 1e-9 + + def test_orthogonal_vectors_give_zero(self): + from app.clustering import _cosine + a = [1.0, 0.0] + b = [0.0, 1.0] + assert abs(_cosine(a, b)) < 1e-9 + + def test_opposite_vectors_give_minus_one(self): + from app.clustering import _cosine + a = [1.0, 0.0] + b = [-1.0, 0.0] + assert abs(_cosine(a, b) + 1.0) < 1e-9 + + def test_zero_vector_returns_zero(self): + from app.clustering import _cosine + assert _cosine([0.0, 0.0], [1.0, 0.0]) == 0.0 + + def test_symmetry(self): + from app.clustering import _cosine + a = [0.6, 0.8] + b = [0.8, 0.6] + assert abs(_cosine(a, b) - _cosine(b, a)) < 1e-12 + + def test_range_normalized_vectors(self): + from app.clustering import _cosine + rng = random.Random(1) + for _ in range(10): + a = _norm_py([rng.gauss(0, 1) for _ in range(8)]) + b = _norm_py([rng.gauss(0, 1) for _ in range(8)]) + sim = _cosine(a, b) + assert -1.0 - 1e-9 <= sim <= 1.0 + 1e-9 + + +# ─── UnionFind ──────────────────────────────────────────────────────────────── + +class TestUnionFind: + def test_initial_all_separate(self): + from app.clustering import UnionFind + uf = UnionFind(4) + assert len({uf.find(i) for i in range(4)}) == 4 + + def test_union_merges_components(self): + from app.clustering import UnionFind + uf = UnionFind(4) + uf.union(0, 1) + uf.union(2, 3) + assert uf.find(0) == uf.find(1) + assert uf.find(2) == uf.find(3) + assert uf.find(0) != uf.find(2) + + def test_union_find_path_compression(self): + from app.clustering import UnionFind + uf = UnionFind(5) + uf.union(0, 1) + uf.union(1, 2) + uf.union(2, 3) + uf.union(3, 4) + root = uf.find(0) + assert all(uf.find(i) == root for i in range(5)) + + def test_union_self_no_error(self): + from app.clustering import UnionFind + uf = UnionFind(3) + uf.union(1, 1) + assert uf.find(1) == uf.find(1) + + def test_empty_union_find(self): + from app.clustering import UnionFind + uf = UnionFind(0) + assert uf.parent == [] + + +# ─── _cluster_indices ──────────────────────────────────────────────────────── + +class TestClusterIndices: + def test_empty_corpus_returns_empty(self): + from app.clustering import _cluster_indices + assert _cluster_indices([], 0.5) == [] + + def test_single_item_is_singleton(self): + from app.clustering import _cluster_indices + items = _make_items(1) + groups = _cluster_indices(items, 0.5) + assert len(groups) == 1 + assert len(groups[0]) == 1 + + def test_all_identical_items_one_cluster(self): + from app.clustering import _cosine, _cluster_indices + # Alle denselben Vektor → kosinus = 1.0 → alle in einem Cluster + v = [1.0, 0.0, 0.0] + items = [ + {**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v} + for i in range(4) + ] + groups = _cluster_indices(items, 0.5) + assert len(groups) == 1 + assert len(groups[0]) == 4 + + def test_orthogonal_items_all_singletons(self): + """Orthogonale Einheitsvektoren → kosinus=0 → alle Singletons.""" + from app.clustering import _cluster_indices + identity_vecs = [[1 if i == j else 0 for j in range(4)] for i in range(4)] + items = [ + {**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v} + for i, v in enumerate(identity_vecs) + ] + groups = _cluster_indices(items, 0.5) + # Alle Gruppen sind Singletons + assert all(len(g) == 1 for g in groups) + + def test_higher_threshold_fewer_clusters(self): + """Höherer Threshold → mehr Singletons, weniger große Cluster.""" + from app.clustering import _cluster_indices + items = _make_items(8, seed=99) + groups_low = _cluster_indices(items, 0.1) + groups_high = _cluster_indices(items, 0.99) + # Bei low threshold: mind. eine Gruppe > 1 möglich + # Bei high threshold (0.99): fast alle Singletons + singleton_low = sum(1 for g in groups_low if len(g) == 1) + singleton_high = sum(1 for g in groups_high if len(g) == 1) + assert singleton_high >= singleton_low + + def test_sorted_by_size_descending(self): + from app.clustering import _cluster_indices + v = [1.0, 0.0] + items = [ + {**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v} + for i in range(3) + ] + [ + {**_make_items(1)[0], "drucksache": "18/solo", "embedding": [0.0, 1.0]} + ] + groups = _cluster_indices(items, 0.5) + sizes = [len(g) for g in groups] + assert sizes == sorted(sizes, reverse=True) + + +# ─── _dominant_fraktion ─────────────────────────────────────────────────────── + +class TestDominantFraktion: + def test_majority_fraktion_wins(self): + from app.clustering import _dominant_fraktion + items = [ + {"fraktionen": ["SPD"]}, + {"fraktionen": ["SPD"]}, + {"fraktionen": ["CDU"]}, + ] + assert _dominant_fraktion(items) == "SPD" + + def test_empty_items_returns_none(self): + from app.clustering import _dominant_fraktion + assert _dominant_fraktion([]) is None + + def test_empty_fraktionen_lists_returns_none(self): + from app.clustering import _dominant_fraktion + items = [{"fraktionen": []}, {"fraktionen": None}] + assert _dominant_fraktion(items) is None + + +# ─── _cluster_label ─────────────────────────────────────────────────────────── + +class TestClusterLabel: + def test_top_theme_used_as_label(self): + from app.clustering import _cluster_label + items = [ + {"themen": ["Klimaschutz", "Energie"], "title": "A"}, + {"themen": ["Klimaschutz"], "title": "B"}, + ] + label = _cluster_label(items) + assert "Klimaschutz" in label + + def test_fallback_to_shortest_title(self): + from app.clustering import _cluster_label + items = [ + {"themen": [], "title": "Kurz"}, + {"themen": [], "title": "Sehr langer Titel"}, + ] + label = _cluster_label(items) + assert label == "Kurz" + + def test_fallback_cluster_label(self): + from app.clustering import _cluster_label + items = [{"themen": [], "title": None}] + label = _cluster_label(items) + assert label == "Cluster" + + +# ─── _cluster_summary ──────────────────────────────────────────────────────── + +class TestClusterSummary: + def test_basic_fields_present(self): + from app.clustering import _cluster_summary + items = _make_items(3) + summary = _cluster_summary(items) + for key in ("size", "label", "dominant_fraktion", "avg_gwoe_score", "drucksachen"): + assert key in summary + + def test_size_correct(self): + from app.clustering import _cluster_summary + items = _make_items(4) + summary = _cluster_summary(items) + assert summary["size"] == 4 + + def test_avg_score_calculated(self): + from app.clustering import _cluster_summary + items = [ + {**_make_items(1)[0], "gwoe_score": 4.0}, + {**_make_items(1)[0], "gwoe_score": 6.0}, + ] + summary = _cluster_summary(items) + assert summary["avg_gwoe_score"] == 5.0 + + def test_include_edges_adds_nodes_and_edges(self): + from app.clustering import _cluster_summary + items = _make_items(3) + summary = _cluster_summary(items, include_edges=True) + assert "nodes" in summary + assert "edges" in summary + assert len(summary["nodes"]) == 3 + # 3 Knoten → 3 Kanten (0-1, 0-2, 1-2) + assert len(summary["edges"]) == 3 + + def test_no_edges_without_flag(self): + from app.clustering import _cluster_summary + items = _make_items(3) + summary = _cluster_summary(items, include_edges=False) + assert "edges" not in summary + assert "nodes" not in summary + + +# ─── build_hierarchy (async, DB gemockt) ───────────────────────────────────── + +class TestBuildHierarchy: + def test_empty_corpus_structure(self): + """Leerer Corpus → korrekte Grundstruktur.""" + from app import clustering + + async def fake_load(bundesland=None): + return [] + + with patch.object(clustering, "load_assessment_items", side_effect=fake_load): + result = run(clustering.build_hierarchy()) + + assert result["meta"]["total"] == 0 + assert result["clusters"] == [] + assert result["singletons"] == [] + + def test_single_item_becomes_singleton(self): + from app import clustering + items = _make_items(1) + + async def fake_load(bundesland=None): + return items + + with patch.object(clustering, "load_assessment_items", side_effect=fake_load): + result = run(clustering.build_hierarchy(threshold=0.5)) + + assert len(result["singletons"]) == 1 + assert result["clusters"] == [] + + def test_meta_fields_present(self): + from app import clustering + items = _make_items(4) + + async def fake_load(bundesland=None): + return items + + with patch.object(clustering, "load_assessment_items", side_effect=fake_load): + result = run(clustering.build_hierarchy()) + + meta = result["meta"] + for key in ("total", "threshold", "num_clusters", "num_singletons"): + assert key in meta + + def test_threshold_affects_cluster_count(self): + """Niedrigerer Threshold → mehr Kanten → potenziell mehr gebündelte Items.""" + from app import clustering + # Identische Items → immer ein Cluster bei jedem Threshold < 1.0 + v = [1.0, 0.0, 0.0] + items = [ + {**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v} + for i in range(3) + ] + + async def fake_load(bundesland=None): + return items + + with patch.object(clustering, "load_assessment_items", side_effect=fake_load): + result = run(clustering.build_hierarchy(threshold=0.5)) + + assert len(result["clusters"]) == 1 + assert result["clusters"][0]["size"] == 3 + + +# ─── find_similar_assessments (async, DB gemockt) ──────────────────────────── + +class TestFindSimilarAssessments: + def test_returns_empty_for_unknown_drucksache(self): + from app import clustering + items = _make_items(3) + + async def fake_load(bundesland=None): + return items + + with patch.object(clustering, "load_assessment_items", side_effect=fake_load): + result = run(clustering.find_similar_assessments("99/9999")) + + assert result == [] + + def test_returns_top_k_results(self): + from app import clustering + items = _make_items(5) + target_id = items[0]["drucksache"] + + async def fake_load(bundesland=None): + return items + + with patch.object(clustering, "load_assessment_items", side_effect=fake_load): + result = run(clustering.find_similar_assessments(target_id, top_k=3)) + + assert len(result) == 3 + + def test_excludes_self(self): + from app import clustering + items = _make_items(5) + target_id = items[0]["drucksache"] + + async def fake_load(bundesland=None): + return items + + with patch.object(clustering, "load_assessment_items", side_effect=fake_load): + result = run(clustering.find_similar_assessments(target_id, top_k=10)) + + drucksachen = [r["drucksache"] for r in result] + assert target_id not in drucksachen + + def test_result_sorted_by_similarity_descending(self): + from app import clustering + items = _make_items(5) + target_id = items[0]["drucksache"] + + async def fake_load(bundesland=None): + return items + + with patch.object(clustering, "load_assessment_items", side_effect=fake_load): + result = run(clustering.find_similar_assessments(target_id, top_k=4)) + + sims = [r["similarity"] for r in result] + assert sims == sorted(sims, reverse=True) + + def test_result_fields_present(self): + from app import clustering + items = _make_items(3) + target_id = items[0]["drucksache"] + + async def fake_load(bundesland=None): + return items + + with patch.object(clustering, "load_assessment_items", side_effect=fake_load): + result = run(clustering.find_similar_assessments(target_id, top_k=2)) + + for r in result: + for key in ("drucksache", "title", "bundesland", "fraktionen", + "gwoe_score", "empfehlung", "similarity"): + assert key in r + + def test_single_item_corpus_returns_empty(self): + """Nur ein Item im Corpus → nach Selbst-Ausschluss kein Ergebnis.""" + from app import clustering + items = _make_items(1) + + async def fake_load(bundesland=None): + return items + + with patch.object(clustering, "load_assessment_items", side_effect=fake_load): + result = run(clustering.find_similar_assessments(items[0]["drucksache"])) + + assert result == [] diff --git a/tests/test_database.py b/tests/test_database.py new file mode 100644 index 0000000..556c882 --- /dev/null +++ b/tests/test_database.py @@ -0,0 +1,554 @@ +"""Unit-Tests für app/database.py (#134 Phase 2). + +Alle Tests nutzen eine tmp-Datei als SQLite-DB (via tmp_path-Fixture). +settings.db_path wird per monkeypatch auf die tmp-Datei umgebogen. +Keine Prod-DB wird angetastet. +""" +from __future__ import annotations + +import asyncio +import sys +import types + +import pytest + +# test_mail.py und test_monitoring.py stubben aiosqlite als leeres +# ModuleType-Objekt (ohne .connect). Wenn diese Files zuerst oder parallel +# gesammelt werden, landet der Stub in sys.modules und database.py importiert +# ihn statt des echten Pakets. +# +# Strategie: Stub jetzt entfernen (falls schon drin) und das echte aiosqlite +# importieren. app.database NICHT aus sys.modules entfernen — eine bereits +# importierte Version mit dem echten aiosqlite soll erhalten bleiben. +# Dafür importieren wir aiosqlite und database schon hier auf Modulebene, +# damit die Bindung in database.py auf das echte Paket zeigt, bevor +# andere Test-Files den Stub injizieren. +_aio = sys.modules.get("aiosqlite") +if _aio is not None and not hasattr(_aio, "connect"): + del sys.modules["aiosqlite"] + +# Jetzt echtes aiosqlite laden und app.database mit diesem Paket importieren. +# Der Import passiert hier auf Modulebene (Collection-Zeit), also bevor +# test_mail.py / test_monitoring.py ihre Stubs setzen können. +import aiosqlite as _real_aiosqlite # noqa: E402 +# App-Package mit echtem aiosqlite importieren und in sys.modules verankern. +# Nachfolgende "from app import database" in Fixtures holen das gecachte Modul. +import importlib as _importlib +if "app.database" in sys.modules: + # Schon gecacht — prüfen ob es das echte aiosqlite hat + _db_mod = sys.modules["app.database"] + if not hasattr(getattr(_db_mod, "aiosqlite", None), "connect"): + # Gecachte Version hat den Stub → neu laden + del sys.modules["app.database"] + _importlib.import_module("app.database") +else: + _importlib.import_module("app.database") + +# aiosqlite muss echt importierbar sein — im Test-Env vorhanden, +# aber falls nicht: früh fehlschlagen statt still hängen. + +# ─── Hilfsfunktion für synchronen Aufruf ───────────────────────────────────── + +def run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +# ─── DB-Fixture ─────────────────────────────────────────────────────────────── + +@pytest.fixture() +def db_path(tmp_path, monkeypatch): + """Setzt settings.db_path auf eine frische tmp-Datei und gibt den Pfad zurück.""" + path = tmp_path / "test.db" + from app.config import settings + monkeypatch.setattr(settings, "db_path", str(path)) + return str(path) + + +@pytest.fixture() +def initialized_db(db_path): + """Initialisierte DB — init_db() einmal gelaufen.""" + from app import database + run(database.init_db()) + return db_path + + +# ─── Minimaler Assessment-Dict ──────────────────────────────────────────────── + +def _assessment(drucksache: str = "18/1234", bundesland: str = "NRW", + score: float = 7.5) -> dict: + return { + "drucksache": drucksache, + "title": f"Testantrag {drucksache}", + "fraktionen": ["SPD", "GRÜNE"], + "datum": "2026-04-15", + "link": "https://example.com", + "bundesland": bundesland, + "gwoeScore": score, + "gwoeBegründung": "Gut.", + "gwoeMatrix": [{"dimension": "A1", "score": 5}], + "gwoeSchwerpunkt": ["A1"], + "wahlprogrammScores": [], + "verbesserungen": [], + "stärken": ["Klimaschutz"], + "schwächen": [], + "empfehlung": "Empfohlen", + "empfehlungSymbol": "✓", + "verbesserungspotenzial": "gering", + "themen": ["Klimaschutz"], + "antragZusammenfassung": "Zusammenfassung.", + "antragKernpunkte": ["Punkt 1"], + "source": "webapp", + "model": "qwen-plus", + "konfidenz": "hoch", + "fehlendeProgramme": [], + } + + +# ─── init_db ───────────────────────────────────────────────────────────────── + +class TestInitDb: + def test_creates_assessments_table(self, db_path): + import aiosqlite + from app import database + run(database.init_db()) + + async def check(): + async with aiosqlite.connect(db_path) as db: + cur = await db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='assessments'" + ) + return await cur.fetchone() + + row = run(check()) + assert row is not None + + def test_creates_jobs_table(self, db_path): + import aiosqlite + from app import database + run(database.init_db()) + + async def check(): + async with aiosqlite.connect(db_path) as db: + cur = await db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='jobs'" + ) + return await cur.fetchone() + + assert run(check()) is not None + + def test_creates_all_required_tables(self, db_path): + import aiosqlite + from app import database + run(database.init_db()) + + expected = { + "assessments", "jobs", "bookmarks", "comments", "votes", + "assessment_versions", "email_subscriptions", + "monitoring_scans", "monitoring_daily_summary", + } + + async def check(): + async with aiosqlite.connect(db_path) as db: + cur = await db.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ) + return {r[0] for r in await cur.fetchall()} + + tables = run(check()) + assert expected <= tables + + def test_idempotent_double_call(self, db_path): + """init_db() zweimal aufrufen darf keinen Fehler werfen.""" + from app import database + run(database.init_db()) + run(database.init_db()) # darf nicht werfen + + +# ─── upsert_assessment / get_assessment ─────────────────────────────────────── + +class TestUpsertGetAssessment: + def test_round_trip(self, initialized_db): + from app import database + data = _assessment("18/9999") + run(database.upsert_assessment(data)) + result = run(database.get_assessment("18/9999")) + assert result is not None + assert result["drucksache"] == "18/9999" + assert result["bundesland"] == "NRW" + + def test_title_stored(self, initialized_db): + from app import database + data = _assessment("18/0001") + data["title"] = "Spezieller Titel" + run(database.upsert_assessment(data)) + result = run(database.get_assessment("18/0001")) + assert result["title"] == "Spezieller Titel" + + def test_gwoe_score_stored(self, initialized_db): + from app import database + data = _assessment("18/0002", score=8.5) + run(database.upsert_assessment(data)) + result = run(database.get_assessment("18/0002")) + assert result["gwoe_score"] == 8.5 + + def test_json_fields_deserialized(self, initialized_db): + from app import database + data = _assessment("18/0003") + run(database.upsert_assessment(data)) + result = run(database.get_assessment("18/0003")) + assert isinstance(result["fraktionen"], list) + assert isinstance(result["themen"], list) + + def test_missing_assessment_returns_none(self, initialized_db): + from app import database + result = run(database.get_assessment("99/9999")) + assert result is None + + def test_upsert_updates_existing(self, initialized_db): + from app import database + data = _assessment("18/0004", score=5.0) + run(database.upsert_assessment(data)) + data2 = _assessment("18/0004", score=9.0) + run(database.upsert_assessment(data2)) + result = run(database.get_assessment("18/0004")) + assert result["gwoe_score"] == 9.0 + + def test_upsert_archives_old_version(self, initialized_db): + """Bei Re-Save wird Vorversion in assessment_versions archiviert.""" + import aiosqlite + from app import database + data = _assessment("18/0005", score=5.0) + run(database.upsert_assessment(data)) + data2 = _assessment("18/0005", score=7.0) + run(database.upsert_assessment(data2)) + + async def count_versions(): + async with aiosqlite.connect(initialized_db) as db: + cur = await db.execute( + "SELECT COUNT(*) FROM assessment_versions WHERE drucksache='18/0005'" + ) + return (await cur.fetchone())[0] + + assert run(count_versions()) == 1 + + +# ─── get_all_assessments ────────────────────────────────────────────────────── + +class TestGetAllAssessments: + def test_returns_empty_list_initially(self, initialized_db): + from app import database + result = run(database.get_all_assessments()) + assert result == [] + + def test_returns_inserted_assessments(self, initialized_db): + from app import database + run(database.upsert_assessment(_assessment("18/1001"))) + run(database.upsert_assessment(_assessment("18/1002"))) + result = run(database.get_all_assessments()) + assert len(result) == 2 + + def test_bundesland_filter_none_returns_all(self, initialized_db): + from app import database + run(database.upsert_assessment(_assessment("18/1003", bundesland="NRW"))) + run(database.upsert_assessment(_assessment("18/1004", bundesland="BY"))) + result = run(database.get_all_assessments(bundesland=None)) + assert len(result) == 2 + + def test_bundesland_filter_all_returns_all(self, initialized_db): + from app import database + run(database.upsert_assessment(_assessment("18/1005", bundesland="NRW"))) + run(database.upsert_assessment(_assessment("18/1006", bundesland="BY"))) + result = run(database.get_all_assessments(bundesland="ALL")) + assert len(result) == 2 + + def test_bundesland_filter_nrw_only(self, initialized_db): + from app import database + run(database.upsert_assessment(_assessment("18/1007", bundesland="NRW"))) + run(database.upsert_assessment(_assessment("18/1008", bundesland="BY"))) + result = run(database.get_all_assessments(bundesland="NRW")) + assert len(result) == 1 + assert result[0]["bundesland"] == "NRW" + + +# ─── delete_assessment ──────────────────────────────────────────────────────── + +class TestDeleteAssessment: + def test_deletes_existing(self, initialized_db): + from app import database + run(database.upsert_assessment(_assessment("18/2001"))) + result = run(database.delete_assessment("18/2001")) + assert result is True + assert run(database.get_assessment("18/2001")) is None + + def test_returns_false_for_nonexistent(self, initialized_db): + from app import database + result = run(database.delete_assessment("99/9999")) + assert result is False + + +# ─── assessment_versions ───────────────────────────────────────────────────── + +class TestAssessmentHistory: + def test_empty_history_for_new_assessment(self, initialized_db): + from app import database + run(database.upsert_assessment(_assessment("18/3001"))) + history = run(database.get_assessment_history("18/3001")) + assert history == [] + + def test_history_after_update(self, initialized_db): + from app import database + run(database.upsert_assessment(_assessment("18/3002", score=5.0))) + run(database.upsert_assessment(_assessment("18/3002", score=7.0))) + history = run(database.get_assessment_history("18/3002")) + assert len(history) == 1 + assert history[0]["gwoe_score"] == 5.0 + + def test_version_increments_on_multiple_saves(self, initialized_db): + from app import database + run(database.upsert_assessment(_assessment("18/3003", score=4.0))) + run(database.upsert_assessment(_assessment("18/3003", score=6.0))) + run(database.upsert_assessment(_assessment("18/3003", score=8.0))) + history = run(database.get_assessment_history("18/3003")) + assert len(history) == 2 + versions = {h["version"] for h in history} + assert versions == {1, 2} + + +# ─── bookmarks ─────────────────────────────────────────────────────────────── + +class TestBookmarks: + def test_toggle_adds_bookmark(self, initialized_db): + from app import database + added = run(database.toggle_bookmark("user1", "18/4001")) + assert added is True + + def test_toggle_removes_existing_bookmark(self, initialized_db): + from app import database + run(database.toggle_bookmark("user1", "18/4002")) + removed = run(database.toggle_bookmark("user1", "18/4002")) + assert removed is False + + def test_get_bookmarks_returns_list(self, initialized_db): + from app import database + run(database.toggle_bookmark("user2", "18/4003")) + run(database.toggle_bookmark("user2", "18/4004")) + bm = run(database.get_bookmarks("user2")) + assert set(bm) == {"18/4003", "18/4004"} + + def test_get_bookmarks_empty_for_unknown_user(self, initialized_db): + from app import database + bm = run(database.get_bookmarks("nobody")) + assert bm == [] + + +# ─── monitoring_scans ──────────────────────────────────────────────────────── + +class TestMonitoringScans: + def test_new_scan_returns_true(self, initialized_db): + from app import database + is_new = run(database.upsert_monitoring_scan( + bundesland="NRW", + drucksache="18/5001", + title="Testantrag", + datum="2026-04-20", + typ="Antrag", + typ_normiert="antrag", + fraktionen=["SPD"], + link="https://example.com", + now="2026-04-20T10:00:00", + )) + assert is_new is True + + def test_second_upsert_returns_false(self, initialized_db): + from app import database + run(database.upsert_monitoring_scan( + bundesland="NRW", drucksache="18/5002", + title="T", datum="2026-04-20", typ="Antrag", + typ_normiert="antrag", fraktionen=[], + link=None, now="2026-04-20T10:00:00", + )) + is_new = run(database.upsert_monitoring_scan( + bundesland="NRW", drucksache="18/5002", + title="T", datum="2026-04-20", typ="Antrag", + typ_normiert="antrag", fraktionen=[], + link=None, now="2026-04-20T11:00:00", + )) + assert is_new is False + + +# ─── monitoring_daily_summary ───────────────────────────────────────────────── + +class TestMonitoringDailySummary: + def test_upsert_and_get_summary(self, initialized_db): + from app import database + run(database.upsert_monitoring_summary( + scan_date="2026-04-20", + bundesland="NRW", + total_seen=10, + new_count=3, + errors=None, + )) + rows = run(database.get_monitoring_summary("2026-04-20")) + assert len(rows) == 1 + assert rows[0]["total_seen"] == 10 + assert rows[0]["new_count"] == 3 + + def test_upsert_summary_updates_on_conflict(self, initialized_db): + from app import database + run(database.upsert_monitoring_summary("2026-04-20", "NRW", 5, 1, None)) + run(database.upsert_monitoring_summary("2026-04-20", "NRW", 15, 4, "Fehler")) + rows = run(database.get_monitoring_summary("2026-04-20")) + assert len(rows) == 1 + assert rows[0]["total_seen"] == 15 + + def test_get_summary_empty_for_unknown_date(self, initialized_db): + from app import database + rows = run(database.get_monitoring_summary("1999-01-01")) + assert rows == [] + + +# ─── email_subscriptions ───────────────────────────────────────────────────── + +class TestEmailSubscriptions: + def test_create_and_list_subscription(self, initialized_db): + from app import database + sub_id = run(database.create_subscription( + user_id="u1", email="test@example.com", + bundesland="NRW", partei="SPD", + )) + assert isinstance(sub_id, int) + subs = run(database.list_subscriptions("u1")) + assert len(subs) == 1 + assert subs[0]["email"] == "test@example.com" + + def test_delete_subscription_own(self, initialized_db): + from app import database + sub_id = run(database.create_subscription("u2", "a@b.com")) + deleted = run(database.delete_subscription("u2", sub_id)) + assert deleted is True + assert run(database.list_subscriptions("u2")) == [] + + def test_delete_subscription_wrong_user_fails(self, initialized_db): + from app import database + sub_id = run(database.create_subscription("u3", "a@b.com")) + deleted = run(database.delete_subscription("wrong_user", sub_id)) + assert deleted is False + + def test_get_all_subscriptions_due_empty(self, initialized_db): + from app import database + due = run(database.get_all_subscriptions_due()) + assert due == [] + + +# ─── _parse_search_query ───────────────────────────────────────────────────── + +class TestParseSearchQuery: + def test_single_term(self): + from app.database import _parse_search_query + terms, is_exact = _parse_search_query("klimaschutz") + assert terms == ["klimaschutz"] + assert is_exact is False + + def test_multi_term_split(self): + from app.database import _parse_search_query + terms, is_exact = _parse_search_query("Klimaschutz Energie") + assert terms == ["klimaschutz", "energie"] + assert is_exact is False + + def test_exact_phrase_in_quotes(self): + from app.database import _parse_search_query + terms, is_exact = _parse_search_query('"Grüner Stahl"') + assert terms == ["grüner stahl"] + assert is_exact is True + + def test_whitespace_stripped(self): + from app.database import _parse_search_query + terms, is_exact = _parse_search_query(" hallo ") + assert terms[0] == "hallo" + + +# ─── Merkliste (#140) ──────────────────────────────────────────────────────── + +class TestMerkliste: + def test_add_and_list(self, initialized_db): + from app import database + run(database.merkliste_add("user1", "18/1001")) + run(database.merkliste_add("user1", "18/1002", notiz="Wichtig")) + entries = run(database.merkliste_list("user1")) + ids = [e["antrag_id"] for e in entries] + assert "18/1001" in ids + assert "18/1002" in ids + + def test_add_with_notiz(self, initialized_db): + from app import database + run(database.merkliste_add("user1", "18/2001", notiz="Mein Kommentar")) + entries = run(database.merkliste_list("user1")) + match = next((e for e in entries if e["antrag_id"] == "18/2001"), None) + assert match is not None + assert match["notiz"] == "Mein Kommentar" + + def test_remove(self, initialized_db): + from app import database + run(database.merkliste_add("user1", "18/3001")) + removed = run(database.merkliste_remove("user1", "18/3001")) + assert removed is True + entries = run(database.merkliste_list("user1")) + assert not any(e["antrag_id"] == "18/3001" for e in entries) + + def test_remove_nonexistent_returns_false(self, initialized_db): + from app import database + removed = run(database.merkliste_remove("user1", "18/9999")) + assert removed is False + + def test_list_empty_for_unknown_user(self, initialized_db): + from app import database + entries = run(database.merkliste_list("unknown_user")) + assert entries == [] + + def test_user_isolation(self, initialized_db): + from app import database + run(database.merkliste_add("userA", "18/5001")) + run(database.merkliste_add("userB", "18/5002")) + a_entries = run(database.merkliste_list("userA")) + b_entries = run(database.merkliste_list("userB")) + assert all(e["antrag_id"] == "18/5001" for e in a_entries) + assert all(e["antrag_id"] == "18/5002" for e in b_entries) + + def test_upsert_idempotent(self, initialized_db): + from app import database + run(database.merkliste_add("user1", "18/6001")) + run(database.merkliste_add("user1", "18/6001")) # zweites Mal + entries = run(database.merkliste_list("user1")) + dupes = [e for e in entries if e["antrag_id"] == "18/6001"] + assert len(dupes) == 1 + + def test_bulk_add(self, initialized_db): + from app import database + entries = [ + {"antrag_id": "18/7001"}, + {"antrag_id": "18/7002", "notiz": "bulk"}, + ] + count = run(database.merkliste_bulk_add("user1", entries)) + assert count == 2 + listed = run(database.merkliste_list("user1")) + ids = [e["antrag_id"] for e in listed] + assert "18/7001" in ids + assert "18/7002" in ids + + def test_bulk_add_skips_missing_antrag_id(self, initialized_db): + from app import database + entries = [ + {"antrag_id": "18/8001"}, + {"notiz": "kein antrag_id"}, # soll übersprungen werden + ] + count = run(database.merkliste_bulk_add("user1", entries)) + assert count == 1 + + def test_bulk_add_no_duplicates(self, initialized_db): + from app import database + run(database.merkliste_add("user1", "18/9001")) + count = run(database.merkliste_bulk_add("user1", [{"antrag_id": "18/9001"}])) + # Do-Nothing bei Konflikt → zählt trotzdem als verarbeitet + assert count == 1 + listed = run(database.merkliste_list("user1")) + assert len([e for e in listed if e["antrag_id"] == "18/9001"]) == 1 diff --git a/tests/test_domain_behavior.py b/tests/test_domain_behavior.py new file mode 100644 index 0000000..3681b45 --- /dev/null +++ b/tests/test_domain_behavior.py @@ -0,0 +1,154 @@ +"""Tests für Domain-Verhalten auf Pydantic-Models (ADR 0008 Tag 4). + +Die neuen Methoden auf ``Assessment`` und ``MatrixEntry`` machen +Invarianten aus dem LLM-System-Prompt server-seitig testbar. Sie +werfen (noch) nicht — der Hintergrund steht in ADR 0008 Kapitel +„Konsequenzen". +""" +from __future__ import annotations + +import pytest + +from app.models import Assessment, Empfehlung, MatrixEntry, Verbesserungspotenzial + + +# ─── MatrixEntry.ist_fundamental_kritisch ────────────────────────────────── + +class TestMatrixEntryFundamentalKritisch: + @pytest.mark.parametrize("rating,expected", [ + (-5, True), (-4, True), + (-3, False), (-1, False), (0, False), + (1, False), (5, False), + ]) + def test_boundary(self, rating, expected): + m = MatrixEntry(field="D4", label="x", aspect="y", rating=rating) + assert m.ist_fundamental_kritisch() is expected + + +# ─── MatrixEntry.to_symbol ───────────────────────────────────────────────── + +class TestMatrixEntrySymbol: + @pytest.mark.parametrize("rating,expected", [ + (5, "++"), (4, "++"), + (3, "+"), (1, "+"), + (0, "○"), + (-1, "−"), (-3, "−"), + (-4, "−−"), (-5, "−−"), + ]) + def test_symbol_from_rating(self, rating, expected): + m = MatrixEntry(field="A1", label="x", aspect="y", rating=rating) + assert m.to_symbol() == expected + + +# ─── Assessment-Behavior ─────────────────────────────────────────────────── + +def _make_assessment(*, score: float = 5.0, empfehlung=Empfehlung.UEBERARBEITEN, + matrix_ratings: list[int] | None = None) -> Assessment: + matrix = [ + MatrixEntry(field="D4", label="x", aspect="y", rating=r) + for r in (matrix_ratings or []) + ] + return Assessment( + drucksache="18/1", + title="Test", + fraktionen=["SPD"], + datum="2024-01-01", + gwoe_score=score, + gwoe_begruendung="Test", + gwoe_matrix=matrix, + gwoe_schwerpunkt=[], + wahlprogramm_scores=[], + empfehlung=empfehlung, + verbesserungspotenzial=Verbesserungspotenzial.MITTEL, + ) + + +class TestAssessmentEmpfehlungHelpers: + def test_ist_ablehnung_true_for_ablehnen(self): + a = _make_assessment(empfehlung=Empfehlung.ABLEHNEN, score=1.0) + assert a.ist_ablehnung() is True + + def test_ist_ablehnung_false_otherwise(self): + a = _make_assessment(empfehlung=Empfehlung.UNTERSTUETZEN_MIT) + assert a.ist_ablehnung() is False + + def test_ist_uneingeschraenkt(self): + a = _make_assessment(empfehlung=Empfehlung.UNEINGESCHRAENKT, score=9.0) + assert a.ist_uneingeschraenkt_unterstuetzend() is True + + def test_ist_uneingeschraenkt_false_for_other(self): + a = _make_assessment(empfehlung=Empfehlung.UEBERARBEITEN) + assert a.ist_uneingeschraenkt_unterstuetzend() is False + + +class TestAssessmentHatFundamentalKritischesFeld: + def test_empty_matrix(self): + a = _make_assessment(matrix_ratings=[]) + assert a.hat_fundamental_kritisches_feld() is False + + def test_no_critical_field(self): + a = _make_assessment(matrix_ratings=[2, 3, -1, -3]) + assert a.hat_fundamental_kritisches_feld() is False + + def test_single_critical_field(self): + a = _make_assessment(matrix_ratings=[2, -4, 1]) + assert a.hat_fundamental_kritisches_feld() is True + + def test_multiple_critical_fields(self): + a = _make_assessment(matrix_ratings=[-4, -5, -4]) + assert a.hat_fundamental_kritisches_feld() is True + + +class TestAssessmentVerletztScoreCap: + """Die zentrale Invariante: Bei rating ≤ -4 muss gwoe_score ≤ 3.""" + + def test_no_critical_field_never_violates(self): + a = _make_assessment(score=10.0, matrix_ratings=[1, 2, -3]) + assert a.verletzt_score_cap() is False + + def test_critical_field_with_high_score_violates(self): + a = _make_assessment(score=7.0, matrix_ratings=[-4]) + assert a.verletzt_score_cap() is True + + def test_critical_field_with_capped_score_ok(self): + a = _make_assessment(score=3.0, matrix_ratings=[-4]) + assert a.verletzt_score_cap() is False + + def test_critical_field_with_score_exactly_cap_ok(self): + """Boundary: 3.0 ist noch OK, 3.01 wäre Verletzung.""" + a = _make_assessment(score=3.0, matrix_ratings=[-5]) + assert a.verletzt_score_cap() is False + + def test_critical_field_with_score_just_above_cap_violates(self): + a = _make_assessment(score=3.1, matrix_ratings=[-5]) + assert a.verletzt_score_cap() is True + + +class TestAssessmentMethodsCoexistWithSerialization: + """Sanity-Check: Die neuen Methoden brechen nicht die Pydantic- + Serialisierung, die von der DB/API-Grenze gebraucht wird.""" + + def test_model_dump_still_works(self): + a = _make_assessment(score=5.0, matrix_ratings=[2, -1]) + dumped = a.model_dump(by_alias=True) + assert dumped["gwoeScore"] == 5.0 + assert "ist_ablehnung" not in dumped # Methoden leaken nicht in JSON + + def test_model_validate_via_alias(self): + data = { + "drucksache": "18/2", + "title": "t", + "fraktionen": [], + "datum": "2024-01-01", + "gwoeScore": 8.0, + "gwoeBegründung": "x", + "gwoeMatrix": [ + {"field": "A1", "label": "x", "aspect": "y", "rating": 3} + ], + "gwoeSchwerpunkt": [], + "wahlprogrammScores": [], + "empfehlung": "Uneingeschränkt unterstützen", + "verbesserungspotenzial": "gering", + } + a = Assessment.model_validate(data) + assert a.ist_uneingeschraenkt_unterstuetzend() is True diff --git a/tests/test_drucksache_typen.py b/tests/test_drucksache_typen.py new file mode 100644 index 0000000..3c35827 --- /dev/null +++ b/tests/test_drucksache_typen.py @@ -0,0 +1,204 @@ +"""Direkte Unit-Tests für app/drucksache_typen.py (#134 Phase 2). + +Testet normalize_typ() und ist_abstimmbar() direkt — nicht indirekt via +test_parlamente.py. Deckt bekannte BL-spezifische Strings, Edge-Cases +und den TH-Bug-Pattern ("Antrag gemäß § 79 GO") ab. +""" + +import pytest + +from app.drucksache_typen import ( + ANTRAG, + GESETZENTWURF, + AENDERUNGSANTRAG, + DRINGLICHKEITSANTRAG, + ENTSCHLIESSUNGSANTRAG, + BESCHLUSSEMPFEHLUNG, + KLEINE_ANFRAGE, + GROSSE_ANFRAGE, + UNTERRICHTUNG, + PETITION, + BERICHT, + SONSTIGE, + ABSTIMMBARE_TYPEN, + normalize_typ, + ist_abstimmbar, + ist_abstimmbar_original, +) + + +# ─── normalize_typ ──────────────────────────────────────────────────────────── + +class TestNormalizTyp: + # --- Standard-Antragstypen verschiedener BL --- + + def test_antrag_nrw(self): + assert normalize_typ("Antrag") == ANTRAG + + def test_antrag_case_insensitive(self): + assert normalize_typ("ANTRAG") == ANTRAG + + def test_antrag_lowercase(self): + assert normalize_typ("antrag") == ANTRAG + + def test_gesetzentwurf(self): + assert normalize_typ("Gesetzentwurf") == GESETZENTWURF + + def test_gesetzentwurf_with_prefix(self): + # Bayern: "Dringlicher Gesetzentwurf" + assert normalize_typ("Dringlicher Gesetzentwurf") == GESETZENTWURF + + def test_aenderungsantrag(self): + assert normalize_typ("Änderungsantrag") == AENDERUNGSANTRAG + + def test_aenderungsantrag_ascii(self): + assert normalize_typ("Aenderungsantrag") == AENDERUNGSANTRAG + + def test_dringlichkeitsantrag(self): + assert normalize_typ("Dringlichkeitsantrag") == DRINGLICHKEITSANTRAG + + def test_entschliessungsantrag(self): + assert normalize_typ("Entschließungsantrag") == ENTSCHLIESSUNGSANTRAG + + def test_entschliessungsantrag_ascii(self): + assert normalize_typ("Entschliessungsantrag") == ENTSCHLIESSUNGSANTRAG + + # --- BL-spezifische Nicht-Antrags-Typen --- + + def test_kleine_anfrage(self): + assert normalize_typ("Kleine Anfrage") == KLEINE_ANFRAGE + + def test_kleine_anfrage_nrw_format(self): + # NRW: "Kleine Anfrage 4178 von ..." + assert normalize_typ("Kleine Anfrage 4178 von Abgeordneten") == KLEINE_ANFRAGE + + def test_grosse_anfrage(self): + assert normalize_typ("Große Anfrage") == GROSSE_ANFRAGE + + def test_grosse_anfrage_ascii(self): + assert normalize_typ("Grosse Anfrage") == GROSSE_ANFRAGE + + def test_anfrage_generic_fallback(self): + # "Anfrage" ohne Große/Kleine → KLEINE_ANFRAGE per Fallback-Regel + assert normalize_typ("Anfrage") == KLEINE_ANFRAGE + + def test_beschlussempfehlung(self): + assert normalize_typ("Beschlussempfehlung") == BESCHLUSSEMPFEHLUNG + + def test_unterrichtung(self): + assert normalize_typ("Unterrichtung") == UNTERRICHTUNG + + def test_mitteilung_maps_to_unterrichtung(self): + assert normalize_typ("Mitteilung") == UNTERRICHTUNG + + def test_vorlage_maps_to_unterrichtung(self): + assert normalize_typ("Vorlage") == UNTERRICHTUNG + + def test_bericht(self): + assert normalize_typ("Bericht") == BERICHT + + def test_petition(self): + assert normalize_typ("Petition") == PETITION + + # --- TH-Bug-Pattern: "Antrag gemäß § 79 GO" muss als ANTRAG erkannt werden --- + + def test_th_antrag_gemaess_paragraph(self): + """Regression: TH-Landtag verwendet 'Antrag gemäß § 79 GO'. + Das spezifischere Pattern 'antrag gemäß' liegt vor 'antrag' in der + Map, daher wird es korrekt als ANTRAG normalisiert.""" + assert normalize_typ("Antrag gemäß § 79 GO") == ANTRAG + + def test_th_antrag_gemaess_case_insensitive(self): + assert normalize_typ("ANTRAG GEMÄSS § 79 GO") == ANTRAG + + def test_th_antrag_gemaess_lowercase(self): + assert normalize_typ("antrag gemäß § 89 thürgo") == ANTRAG + + # --- Edge-Cases --- + + def test_empty_string(self): + assert normalize_typ("") == SONSTIGE + + def test_whitespace_only(self): + assert normalize_typ(" ") == SONSTIGE + + def test_unknown_type(self): + assert normalize_typ("Völlig unbekannter Drucksachentyp XYZ") == SONSTIGE + + def test_drucksache_generic_maps_to_sonstige(self): + # NRW liefert manchmal nur "Drucksache" ohne Typ-Angabe + assert normalize_typ("Drucksache") == SONSTIGE + + def test_leading_trailing_whitespace_stripped(self): + assert normalize_typ(" Antrag ") == ANTRAG + + def test_substring_match_within_longer_string(self): + # "Gesetzentwurf der Landesregierung" + assert normalize_typ("Gesetzentwurf der Landesregierung") == GESETZENTWURF + + +# ─── ist_abstimmbar ─────────────────────────────────────────────────────────── + +class TestIstAbstimmbar: + def test_antrag_abstimmbar(self): + assert ist_abstimmbar(ANTRAG) is True + + def test_gesetzentwurf_abstimmbar(self): + assert ist_abstimmbar(GESETZENTWURF) is True + + def test_aenderungsantrag_abstimmbar(self): + assert ist_abstimmbar(AENDERUNGSANTRAG) is True + + def test_dringlichkeitsantrag_abstimmbar(self): + assert ist_abstimmbar(DRINGLICHKEITSANTRAG) is True + + def test_entschliessungsantrag_abstimmbar(self): + assert ist_abstimmbar(ENTSCHLIESSUNGSANTRAG) is True + + def test_kleine_anfrage_nicht_abstimmbar(self): + assert ist_abstimmbar(KLEINE_ANFRAGE) is False + + def test_grosse_anfrage_nicht_abstimmbar(self): + assert ist_abstimmbar(GROSSE_ANFRAGE) is False + + def test_beschlussempfehlung_nicht_abstimmbar(self): + assert ist_abstimmbar(BESCHLUSSEMPFEHLUNG) is False + + def test_unterrichtung_nicht_abstimmbar(self): + assert ist_abstimmbar(UNTERRICHTUNG) is False + + def test_bericht_nicht_abstimmbar(self): + assert ist_abstimmbar(BERICHT) is False + + def test_petition_nicht_abstimmbar(self): + assert ist_abstimmbar(PETITION) is False + + def test_sonstige_durchgelassen(self): + """SONSTIGE wird durchgelassen (benefit of the doubt), damit Anträge + ohne erkennbaren Typ nicht fälschlich geblockt werden.""" + assert ist_abstimmbar(SONSTIGE) is True + + def test_abstimmbare_typen_vollstaendig(self): + """Alle normierten abstimmbaren Typen müssen True ergeben.""" + for t in ABSTIMMBARE_TYPEN: + assert ist_abstimmbar(t) is True, f"{t!r} sollte abstimmbar sein" + + +# ─── ist_abstimmbar_original (Convenience) ─────────────────────────────────── + +class TestIstAbstimmbarOriginal: + def test_antrag_string_abstimmbar(self): + assert ist_abstimmbar_original("Antrag") is True + + def test_th_antrag_gemaess_abstimmbar(self): + assert ist_abstimmbar_original("Antrag gemäß § 79 GO") is True + + def test_kleine_anfrage_string_nicht_abstimmbar(self): + assert ist_abstimmbar_original("Kleine Anfrage") is False + + def test_empty_string_durchgelassen(self): + # Leerer String → SONSTIGE → True + assert ist_abstimmbar_original("") is True + + def test_gesetzentwurf_string_abstimmbar(self): + assert ist_abstimmbar_original("Gesetzentwurf der Fraktionen") is True diff --git a/tests/test_embeddings_v3_v4.py b/tests/test_embeddings_v3_v4.py new file mode 100644 index 0000000..272fb3c --- /dev/null +++ b/tests/test_embeddings_v3_v4.py @@ -0,0 +1,266 @@ +"""Tests für das WRITE/READ-Pattern der v3→v4-Embedding-Migration (ADR 0006, Issue #123). + +Alle Tests verwenden eine gestubbte SQLite-In-Memory-DB und mocken den +OpenAI-Client — kein echter API-Aufruf findet statt. +""" + +import json +import sqlite3 +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +# --------------------------------------------------------------------------- +# Helpers — gestubbte DB und Fake-Embeddings +# --------------------------------------------------------------------------- + +def _make_db(path: str) -> sqlite3.Connection: + """Erstelle leere chunks-Tabelle mit model-Spalte.""" + conn = sqlite3.connect(path) + conn.execute(""" + CREATE TABLE chunks ( + id INTEGER PRIMARY KEY, + programm_id TEXT NOT NULL, + partei TEXT NOT NULL, + typ TEXT NOT NULL, + seite INTEGER, + text TEXT NOT NULL, + embedding BLOB NOT NULL, + bundesland TEXT, + model TEXT NOT NULL DEFAULT 'text-embedding-v3', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + conn.execute("CREATE INDEX idx_chunks_model ON chunks(model)") + conn.commit() + return conn + + +def _vec(seed: float, dim: int = 4) -> list[float]: + """Einfacher Einheits-Vektor für Cosine-Tests (dim klein für Geschwindigkeit).""" + v = [seed * (i + 1) for i in range(dim)] + norm = sum(x * x for x in v) ** 0.5 + return [x / norm for x in v] + + +def _insert_chunk(conn, programm_id, partei, typ, text, model, seite=1, bundesland=None): + emb = _vec(0.9) + conn.execute( + "INSERT INTO chunks (programm_id, partei, typ, seite, text, embedding, bundesland, model) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (programm_id, partei, typ, seite, text, json.dumps(emb).encode(), bundesland, model), + ) + conn.commit() + + +# --------------------------------------------------------------------------- +# Test 1: Query mit aktivem READ-Modell findet nur v4-Chunks, ignoriert v3 +# --------------------------------------------------------------------------- + +def test_query_filters_by_read_model(tmp_path, monkeypatch): + """find_relevant_chunks filtert auf EMBEDDING_MODEL_READ; v3-Rows werden ignoriert.""" + db_path = tmp_path / "embeddings.db" + conn = _make_db(str(db_path)) + + # Einen v3- und einen v4-Chunk einfügen + _insert_chunk(conn, "spd-nrw-2022", "SPD", "wahlprogramm", + "Klimaschutz ist unsere Priorität v3", "text-embedding-v3") + _insert_chunk(conn, "spd-nrw-2022", "SPD", "wahlprogramm", + "Klimaschutz ist unsere Priorität v4", "text-embedding-v4") + conn.close() + + # READ = v4 + query_vec = _vec(0.9) + + import app.embeddings as emb_mod + monkeypatch.setattr(emb_mod, "EMBEDDINGS_DB", db_path) + monkeypatch.setattr(emb_mod, "EMBEDDING_MODEL_READ", "text-embedding-v4") + monkeypatch.setattr(emb_mod, "create_embedding", lambda text, model=None: query_vec) + + results = emb_mod.find_relevant_chunks("Klimaschutz", min_similarity=0.0) + texts = [r["text"] for r in results] + assert any("v4" in t for t in texts), "v4-Chunk muss im Ergebnis sein" + assert not any("v3" in t for t in texts), "v3-Chunk darf bei READ=v4 nicht zurückgegeben werden" + + +# --------------------------------------------------------------------------- +# Test 2: index_programm schreibt in WRITE-Modell +# --------------------------------------------------------------------------- + +def test_index_programm_writes_to_write_model(tmp_path, monkeypatch): + """index_programm persistiert Chunks mit dem konfigurierten EMBEDDING_MODEL (write).""" + import app.embeddings as emb_mod + + db_path = tmp_path / "embeddings.db" + # Erstelle leere DB mit Schema + conn = _make_db(str(db_path)) + conn.close() + + pdf_dir = tmp_path / "pdfs" + pdf_dir.mkdir() + + # Stub: PDF-Extraktion gibt einen Fake-Page zurück + monkeypatch.setattr(emb_mod, "EMBEDDINGS_DB", db_path) + monkeypatch.setattr(emb_mod, "EMBEDDING_MODEL", "text-embedding-v4") + monkeypatch.setattr( + emb_mod, "extract_text_with_pages", + lambda path: [(1, "Gemeinwohl Solidarität Nachhaltigkeit " * 10)] + ) + monkeypatch.setattr(emb_mod, "create_embedding", lambda text, model=None: _vec(0.5)) + + # PDF-Datei muss existieren (nur die exists()-Prüfung) + fake_pdf = pdf_dir / "spd-nrw-2022.pdf" + fake_pdf.write_bytes(b"%PDF-1.4 fake") + + count = emb_mod.index_programm("spd-nrw-2022", pdf_dir) + assert count > 0, "index_programm muss mindestens einen Chunk indexieren" + + conn = sqlite3.connect(str(db_path)) + rows = conn.execute("SELECT model FROM chunks WHERE programm_id='spd-nrw-2022'").fetchall() + conn.close() + + assert rows, "Es müssen Rows in der DB sein" + for (model,) in rows: + assert model == "text-embedding-v4", f"Gespeichertes Modell sollte text-embedding-v4 sein, ist {model!r}" + + +# --------------------------------------------------------------------------- +# Test 3: READ-Wechsel — neuer Chunk nach Switch nur im neuen Modell-Raum +# --------------------------------------------------------------------------- + +def test_read_switch_sees_only_new_model_chunks(tmp_path, monkeypatch): + """Nach Switch READ=v4 liefert find_relevant_chunks nur v4-Rows, nicht v3.""" + db_path = tmp_path / "embeddings.db" + conn = _make_db(str(db_path)) + + # Mehrere v3-Rows (alte Prod-Chunks) + for i in range(3): + _insert_chunk(conn, "cdu-nrw-2022", "CDU", "wahlprogramm", + f"Wirtschaft und Arbeit v3 chunk {i}", "text-embedding-v3") + # Ein neuer v4-Row nach Reindex + _insert_chunk(conn, "cdu-nrw-2022", "CDU", "wahlprogramm", + "Wirtschaft und Arbeit v4 chunk 0", "text-embedding-v4") + conn.close() + + import app.embeddings as emb_mod + monkeypatch.setattr(emb_mod, "EMBEDDINGS_DB", db_path) + monkeypatch.setattr(emb_mod, "EMBEDDING_MODEL_READ", "text-embedding-v4") + monkeypatch.setattr(emb_mod, "create_embedding", lambda text, model=None: _vec(0.7)) + + results = emb_mod.find_relevant_chunks("Wirtschaft", min_similarity=0.0) + assert len(results) == 1 + assert "v4" in results[0]["text"] + + +# --------------------------------------------------------------------------- +# Test 4: Gemischte DB — Query filtert modell-korrekt +# --------------------------------------------------------------------------- + +def test_mixed_db_query_filtered_correctly(tmp_path, monkeypatch): + """Bei DB mit v3 + v4 Rows für mehrere Parteien liefert Query nur READ-Modell-Rows.""" + db_path = tmp_path / "embeddings.db" + conn = _make_db(str(db_path)) + + parties = ["SPD", "CDU", "GRÜNE"] + for partei in parties: + _insert_chunk(conn, f"{partei.lower()}-prog", partei, "wahlprogramm", + f"{partei} Programm v3 Text", "text-embedding-v3") + _insert_chunk(conn, f"{partei.lower()}-prog", partei, "wahlprogramm", + f"{partei} Programm v4 Text", "text-embedding-v4") + conn.close() + + import app.embeddings as emb_mod + monkeypatch.setattr(emb_mod, "EMBEDDINGS_DB", db_path) + monkeypatch.setattr(emb_mod, "EMBEDDING_MODEL_READ", "text-embedding-v4") + monkeypatch.setattr(emb_mod, "create_embedding", lambda text, model=None: _vec(0.8)) + + results = emb_mod.find_relevant_chunks("Programm", min_similarity=0.0, top_k=20) + for r in results: + # Alle zurückgegebenen Chunks müssen aus dem READ-Modell-Raum kommen + # (wir können model nicht direkt prüfen, aber den text-Suffix) + assert "v4" in r["text"], f"Unerwarteter v3-Chunk: {r['text']!r}" + assert len(results) == len(parties), "Je eine v4-Row pro Partei erwartet" + + +# --------------------------------------------------------------------------- +# Test 5: Index DELETE löscht nur WRITE-Modell-Rows — v3-Rows bleiben +# --------------------------------------------------------------------------- + +def test_reindex_deletes_only_write_model_rows(tmp_path, monkeypatch): + """Beim Reindex (index_programm) werden alte v4-Rows gelöscht, v3 bleibt erhalten.""" + db_path = tmp_path / "embeddings.db" + conn = _make_db(str(db_path)) + + # Vorhandene v3-Row (aus alter Migration) + _insert_chunk(conn, "spd-nrw-2022", "SPD", "wahlprogramm", + "Alte v3 Zeile bleibt stehen", "text-embedding-v3") + # Vorhandene v4-Row (wird beim Reindex ersetzt) + _insert_chunk(conn, "spd-nrw-2022", "SPD", "wahlprogramm", + "Alte v4 Zeile wird gelöscht", "text-embedding-v4") + conn.close() + + import app.embeddings as emb_mod + pdf_dir = tmp_path / "pdfs" + pdf_dir.mkdir() + fake_pdf = pdf_dir / "spd-nrw-2022.pdf" + fake_pdf.write_bytes(b"%PDF-1.4 fake") + + monkeypatch.setattr(emb_mod, "EMBEDDINGS_DB", db_path) + monkeypatch.setattr(emb_mod, "EMBEDDING_MODEL", "text-embedding-v4") + monkeypatch.setattr( + emb_mod, "extract_text_with_pages", + lambda path: [(1, "Neue v4 Zeile nach Reindex " * 10)] + ) + monkeypatch.setattr(emb_mod, "create_embedding", lambda text, model=None: _vec(0.6)) + + emb_mod.index_programm("spd-nrw-2022", pdf_dir) + + conn = sqlite3.connect(str(db_path)) + v3_rows = conn.execute( + "SELECT text FROM chunks WHERE programm_id='spd-nrw-2022' AND model='text-embedding-v3'" + ).fetchall() + v4_rows = conn.execute( + "SELECT text FROM chunks WHERE programm_id='spd-nrw-2022' AND model='text-embedding-v4'" + ).fetchall() + conn.close() + + assert len(v3_rows) == 1, "v3-Row muss erhalten bleiben" + assert "Alte v3 Zeile" in v3_rows[0][0] + assert all("Alte v4 Zeile" not in r[0] for r in v4_rows), "Alte v4-Row muss ersetzt worden sein" + assert any("Neue v4 Zeile" in r[0] for r in v4_rows), "Neue v4-Rows müssen vorhanden sein" + + +# --------------------------------------------------------------------------- +# Test 6: Query-Embedding nutzt READ-Modell als model-Parameter +# --------------------------------------------------------------------------- + +def test_query_embedding_uses_read_model(tmp_path, monkeypatch): + """find_relevant_chunks ruft create_embedding mit EMBEDDING_MODEL_READ auf.""" + db_path = tmp_path / "embeddings.db" + conn = _make_db(str(db_path)) + _insert_chunk(conn, "spd-nrw-2022", "SPD", "wahlprogramm", + "Solidarität v4", "text-embedding-v4") + conn.close() + + import app.embeddings as emb_mod + monkeypatch.setattr(emb_mod, "EMBEDDINGS_DB", db_path) + monkeypatch.setattr(emb_mod, "EMBEDDING_MODEL_READ", "text-embedding-v4") + + called_with_model = [] + + def _fake_create_embedding(text, model=None): + called_with_model.append(model) + return _vec(0.9) + + monkeypatch.setattr(emb_mod, "create_embedding", _fake_create_embedding) + + emb_mod.find_relevant_chunks("Solidarität", min_similarity=0.0) + + assert called_with_model, "create_embedding muss aufgerufen worden sein" + assert called_with_model[0] == "text-embedding-v4", ( + f"Query-Embedding muss mit READ-Modell erzeugt werden, war aber {called_with_model[0]!r}" + ) diff --git a/tests/test_llm_bewerter.py b/tests/test_llm_bewerter.py new file mode 100644 index 0000000..3e1ae31 --- /dev/null +++ b/tests/test_llm_bewerter.py @@ -0,0 +1,137 @@ +"""Tests für LlmBewerter-Port und QwenBewerter-Adapter (ADR 0008). + +Der Adapter wird mit einem Fake-Client getestet — kein Netzwerk, kein +``openai``-Paket. Retry-Semantik (Temperatur steigt um 0.1 pro Versuch) +ist hier explizit getestet, damit die Migration die Semantik nicht +still verändert. +""" +from __future__ import annotations + +import asyncio +import json +import types + +import pytest + +from app.adapters.qwen_bewerter import QwenBewerter, _strip_markdown_fences +from app.ports.llm_bewerter import LlmBewerter, LlmRequest + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +def _make_fake_client(responses: list[str]): + """Produziert einen Fake-OpenAI-Client, der pro Call einen Response aus + der Liste liefert und Metadaten (Temperatur) aufzeichnet.""" + calls: list[dict] = [] + + class FakeCompletions: + async def create(self, **kwargs): + calls.append(dict(kwargs)) + idx = len(calls) - 1 + content = responses[min(idx, len(responses) - 1)] + return types.SimpleNamespace( + choices=[types.SimpleNamespace( + message=types.SimpleNamespace(content=content) + )] + ) + + class FakeChat: + completions = FakeCompletions() + + class FakeClient: + chat = FakeChat() + + return FakeClient(), calls + + +# ─── Strip-Fences ────────────────────────────────────────────────────────── + +class TestStripMarkdownFences: + def test_plain_json_unchanged(self): + assert _strip_markdown_fences('{"a": 1}') == '{"a": 1}' + + def test_json_fence(self): + assert _strip_markdown_fences('```json\n{"a": 1}\n```') == '{"a": 1}' + + def test_plain_fence(self): + assert _strip_markdown_fences('```\n{"a": 1}\n```') == '{"a": 1}' + + +# ─── Protocol-Konformität ────────────────────────────────────────────────── + +class TestProtocol: + def test_qwen_implements_llm_bewerter(self): + # runtime_checkable Protocol — Method bewerte existiert + qb = QwenBewerter(api_key="x", base_url="y", client=object()) + assert isinstance(qb, LlmBewerter) + + +# ─── QwenBewerter mit FakeClient ─────────────────────────────────────────── + +class TestQwenBewerterHappyPath: + def test_single_successful_call(self): + fake, calls = _make_fake_client(['{"gwoeScore": 7.0}']) + qb = QwenBewerter(api_key="x", base_url="y", client=fake) + request = LlmRequest(system_prompt="sys", user_prompt="usr") + result = _run(qb.bewerte(request)) + assert result == {"gwoeScore": 7.0} + assert len(calls) == 1 + assert calls[0]["temperature"] == pytest.approx(0.3) + + def test_markdown_fence_is_stripped(self): + fake, _ = _make_fake_client(['```json\n{"gwoeScore": 8.0}\n```']) + qb = QwenBewerter(client=fake) + result = _run(qb.bewerte(LlmRequest("sys", "usr"))) + assert result == {"gwoeScore": 8.0} + + def test_passes_model_through(self): + fake, calls = _make_fake_client(['{"a": 1}']) + qb = QwenBewerter(client=fake) + _run(qb.bewerte(LlmRequest("sys", "usr", model="qwen-turbo"))) + assert calls[0]["model"] == "qwen-turbo" + + +class TestQwenBewerterRetries: + def test_retry_raises_temperature(self): + """Bei JSON-Parse-Fehler steigt die Temperatur um 0.1 pro Versuch.""" + fake, calls = _make_fake_client([ + "nicht valides JSON", + "immer noch kaputt", + '{"gwoeScore": 6.0}', # 3. Versuch klappt + ]) + qb = QwenBewerter(client=fake) + request = LlmRequest("sys", "usr", max_retries=3) + result = _run(qb.bewerte(request)) + assert result == {"gwoeScore": 6.0} + assert len(calls) == 3 + assert calls[0]["temperature"] == pytest.approx(0.3) + assert calls[1]["temperature"] == pytest.approx(0.4) + assert calls[2]["temperature"] == pytest.approx(0.5) + + def test_exhausted_retries_raise(self): + fake, _ = _make_fake_client([ + "kaputt", "kaputt", "kaputt", + ]) + qb = QwenBewerter(client=fake) + request = LlmRequest("sys", "usr", max_retries=3) + with pytest.raises(json.JSONDecodeError): + _run(qb.bewerte(request)) + + def test_single_retry_is_respected(self): + """max_retries=1 heißt: genau ein Versuch, kein Retry.""" + fake, calls = _make_fake_client(["kaputt"]) + qb = QwenBewerter(client=fake) + with pytest.raises(json.JSONDecodeError): + _run(qb.bewerte(LlmRequest("sys", "usr", max_retries=1))) + assert len(calls) == 1 + + +class TestLlmRequestDefaults: + def test_defaults_match_legacy_analyzer(self): + req = LlmRequest("s", "u") + assert req.model == "qwen-plus" + assert req.max_retries == 3 + assert req.max_tokens == 4000 + assert req.base_temperature == 0.3 diff --git a/tests/test_mail.py b/tests/test_mail.py new file mode 100644 index 0000000..9a4b62b --- /dev/null +++ b/tests/test_mail.py @@ -0,0 +1,354 @@ +"""Unit-Tests für app/mail.py (#134 Phase 2). + +Testet Unsubscribe-Token-Round-Trip, Digest-Komposition, Filter-Logik +und run_daily_digest() mit leerer Subscription-Tabelle. SMTP-Calls +werden via monkeypatch/unittest.mock gestubbt — kein echter Netzwerk-Call. +""" +from __future__ import annotations + +import asyncio +import sys +import types +from unittest.mock import AsyncMock, MagicMock, patch, patch as _patch + +import pytest + +# aiosqlite ist im Unit-Test-Environment nicht installiert — stub before database import +if "aiosqlite" not in sys.modules: + _aio = types.ModuleType("aiosqlite") + sys.modules["aiosqlite"] = _aio + +# ─── Import-Vorbereitung ───────────────────────────────────────────────────── +# config.py importiert pydantic_settings — conftest stubbt das bereits, +# aber für den direkten Mail-Test laden wir nochmal explizit ab. + +from app.mail import ( + _unsubscribe_token, + verify_unsubscribe_token, + unsubscribe_url, + compose_digest, + _filter_assessments, + run_daily_digest, +) +from app.config import settings + + +# ─── Hilfsfixtures ─────────────────────────────────────────────────────────── + +def _make_sub(id: int = 1, email: str = "test@example.com", + bundesland: str | None = None, partei: str | None = None, + last_sent: str | None = None) -> dict: + return { + "id": id, + "email": email, + "bundesland": bundesland, + "partei": partei, + "last_sent": last_sent, + "frequency": "daily", + } + + +def _make_assessment(drucksache: str = "18/1234", + title: str = "Testantrag", + bundesland: str = "NRW", + fraktionen: list[str] | None = None, + gwoe_score: int = 6, + empfehlung: str = "Empfohlen", + updated_at: str = "2026-04-20T10:00:00") -> dict: + return { + "drucksache": drucksache, + "title": title, + "bundesland": bundesland, + "fraktionen": fraktionen or ["SPD"], + "gwoe_score": gwoe_score, + "empfehlung": empfehlung, + "antrag_zusammenfassung": "Eine kurze Zusammenfassung.", + "updated_at": updated_at, + } + + +# ─── Unsubscribe-Token ──────────────────────────────────────────────────────── + +class TestUnsubscribeToken: + def test_round_trip_valid(self): + token = _unsubscribe_token(42) + assert verify_unsubscribe_token(42, token) is True + + def test_wrong_sub_id_rejected(self): + token = _unsubscribe_token(42) + assert verify_unsubscribe_token(99, token) is False + + def test_tampered_token_rejected(self): + token = _unsubscribe_token(1) + tampered = token[:-1] + ("X" if token[-1] != "X" else "Y") + assert verify_unsubscribe_token(1, tampered) is False + + def test_token_is_urlsafe_string(self): + """Token darf keine +, / oder = enthalten (URL-Sicherheit).""" + token = _unsubscribe_token(7) + assert "+" not in token + assert "/" not in token + assert "=" not in token + + def test_token_length_22(self): + token = _unsubscribe_token(1) + assert len(token) == 22 + + def test_different_ids_produce_different_tokens(self): + t1 = _unsubscribe_token(1) + t2 = _unsubscribe_token(2) + assert t1 != t2 + + def test_unsubscribe_url_contains_base_url_and_token(self): + url = unsubscribe_url(5) + token = _unsubscribe_token(5) + assert settings.base_url in url + assert "/unsubscribe/5/" in url + assert token in url + + +# ─── _filter_assessments ───────────────────────────────────────────────────── + +class TestFilterAssessments: + def test_no_filter_returns_all(self): + rows = [_make_assessment(bundesland="NRW"), _make_assessment(bundesland="BY")] + result = _filter_assessments(rows, bundesland=None, partei=None, since=None) + assert len(result) == 2 + + def test_bundesland_filter_nrw_only(self): + rows = [ + _make_assessment(bundesland="NRW"), + _make_assessment(bundesland="BY"), + _make_assessment(bundesland="NRW"), + ] + result = _filter_assessments(rows, bundesland="NRW", partei=None, since=None) + assert len(result) == 2 + assert all(r["bundesland"] == "NRW" for r in result) + + def test_bundesland_filter_empty_result(self): + rows = [_make_assessment(bundesland="BY")] + result = _filter_assessments(rows, bundesland="NRW", partei=None, since=None) + assert result == [] + + def test_partei_filter_case_insensitive(self): + rows = [ + _make_assessment(fraktionen=["SPD", "GRÜNE"]), + _make_assessment(fraktionen=["CDU"]), + ] + result = _filter_assessments(rows, bundesland=None, partei="spd", since=None) + assert len(result) == 1 + assert "SPD" in result[0]["fraktionen"] + + def test_partei_filter_no_match(self): + rows = [_make_assessment(fraktionen=["CDU"])] + result = _filter_assessments(rows, bundesland=None, partei="FDP", since=None) + assert result == [] + + def test_since_filter_excludes_older(self): + rows = [ + _make_assessment(updated_at="2026-04-19T10:00:00"), # vor since → raus + _make_assessment(updated_at="2026-04-20T10:00:00"), # gleich since → raus (<=) + _make_assessment(updated_at="2026-04-21T10:00:00"), # nach since → drin + ] + result = _filter_assessments(rows, bundesland=None, partei=None, + since="2026-04-20T10:00:00") + assert len(result) == 1 + assert result[0]["updated_at"] == "2026-04-21T10:00:00" + + def test_combined_bundesland_and_partei_filter(self): + rows = [ + _make_assessment(bundesland="NRW", fraktionen=["SPD"]), + _make_assessment(bundesland="NRW", fraktionen=["CDU"]), + _make_assessment(bundesland="BY", fraktionen=["SPD"]), + ] + result = _filter_assessments(rows, bundesland="NRW", partei="SPD", since=None) + assert len(result) == 1 + assert result[0]["bundesland"] == "NRW" + assert "SPD" in result[0]["fraktionen"] + + def test_none_fraktionen_handled(self): + rows = [{"drucksache": "x", "bundesland": "NRW", "fraktionen": None, + "updated_at": "2026-04-20T10:00:00"}] + result = _filter_assessments(rows, bundesland=None, partei="SPD", since=None) + assert result == [] + + +# ─── compose_digest ────────────────────────────────────────────────────────── + +class TestComposeDigest: + def test_subject_contains_count_and_filter_label(self): + sub = _make_sub(bundesland="NRW", partei="SPD") + assessments = [_make_assessment(), _make_assessment(drucksache="18/5678")] + subject, _, _ = compose_digest(sub, assessments) + assert "2" in subject + assert "NRW" in subject + assert "SPD" in subject + + def test_subject_singular_for_one_assessment(self): + sub = _make_sub() + subject, _, _ = compose_digest(sub, [_make_assessment()]) + # "Bewertung" ohne "en" bei n=1 + assert "Bewertung" in subject + assert "Bewertungen" not in subject + + def test_subject_plural_for_multiple(self): + sub = _make_sub() + rows = [_make_assessment(drucksache=f"18/{i}") for i in range(3)] + subject, _, _ = compose_digest(sub, rows) + assert "Bewertungen" in subject + + def test_filter_label_all_when_no_filter(self): + sub = _make_sub() # kein BL/Partei + subject, text, _ = compose_digest(sub, [_make_assessment()]) + assert "alle Bundesländer" in text + + def test_text_body_contains_assessment_title(self): + sub = _make_sub() + row = _make_assessment(title="Klimaschutzantrag NRW") + _, text, _ = compose_digest(sub, [row]) + assert "Klimaschutzantrag NRW" in text + + def test_text_body_contains_unsubscribe_url(self): + sub = _make_sub(id=7) + _, text, _ = compose_digest(sub, [_make_assessment()]) + token = _unsubscribe_token(7) + assert token in text + assert "/unsubscribe/7/" in text + + def test_html_body_is_valid_html(self): + sub = _make_sub() + _, _, html_body = compose_digest(sub, [_make_assessment()]) + assert "" in html_body + assert "" in html_body + + def test_html_body_contains_score(self): + sub = _make_sub() + row = _make_assessment(gwoe_score=8) + _, _, html_body = compose_digest(sub, [row]) + assert "8/10" in html_body + + def test_truncation_at_20_assessments(self): + sub = _make_sub() + rows = [_make_assessment(drucksache=f"18/{i}") for i in range(25)] + _, text, html_body = compose_digest(sub, rows) + assert "und 5 weitere" in text + assert "5 weitere" in html_body + + def test_no_truncation_marker_for_20_or_fewer(self): + sub = _make_sub() + rows = [_make_assessment(drucksache=f"18/{i}") for i in range(20)] + _, text, _ = compose_digest(sub, rows) + assert "weitere" not in text + + def test_html_escaping_in_title(self): + sub = _make_sub() + row = _make_assessment(title='') + _, _, html_body = compose_digest(sub, [row]) + assert "