"""Bug-Regression-Tests für fünf Fix-Commits ohne bisherige Test-Coverage. Je ein Mini-Integration-Test anchored am konkreten Fehler: 3e71547 — PRAGMA cursor fetchall() vor iteration in SQLite 49c1b92 — JWT azp statt aud bei Keycloak Public Clients 1414057 — CDU-PDF AssertionError Fallback in render_highlighted_page 5ea507b — PFLICHT-FRAKTIONEN = alle LT-Fraktionen 038ebd6 — NRW-Titel + Regierungsfraktionen-Pflicht im LLM-Prompt """ import asyncio import json import sqlite3 import sys import types import pytest # --------------------------------------------------------------------------- # Stubs für externe Deps, die in der lokalen Dev-Umgebung nicht vollständig # installiert sind (openai AsyncOpenAI, aiosmtplib, etc.) # --------------------------------------------------------------------------- if "openai" not in sys.modules or not hasattr(sys.modules.get("openai"), "AsyncOpenAI"): openai_stub = types.ModuleType("openai") openai_stub.OpenAI = lambda **kw: None openai_stub.AsyncOpenAI = lambda **kw: None sys.modules["openai"] = openai_stub # =========================================================================== # Bug 1 — PRAGMA cursor fetchall() vor Iteration (Commit 3e71547) # =========================================================================== # Vor dem Fix wurde `conn.execute("PRAGMA table_info(...)")` direkt iteriert, # ohne fetchall() aufzurufen. Bei aiosqlite führt das zu einer Exception oder # leeren Ergebnissen. Der Fix: `cursor.fetchall()` vor dem Set-Comprehension. # Regression: init_db() muss die PRAGMA-Rows korrekt auslesen — fehlt die # Spalte, wird ein ALTER TABLE versucht; ist sie da, wird nichts gemacht. class TestPragmaCursorFetchall: def test_table_info_fetchall_reads_columns(self, tmp_path): """PRAGMA table_info liefert Spaltennamen korrekt via fetchall().""" db = tmp_path / "test.db" conn = sqlite3.connect(str(db)) conn.execute("CREATE TABLE assessments (drucksache TEXT, konfidenz TEXT, summary_embedding BLOB)") conn.commit() # Exakt wie in database.py nach dem Fix: execute().fetchall() cursor = conn.execute("PRAGMA table_info(assessments)") cols = {r[1] for r in cursor.fetchall()} conn.close() assert "konfidenz" in cols assert "summary_embedding" in cols assert "drucksache" in cols def test_fetchall_before_set_comprehension_no_crash(self, tmp_path): """Direktes Iterieren über cursor ohne fetchall() — Regression-Guard. Früher wurde `for r in cursor` statt `for r in cursor.fetchall()` verwendet. Der Test stellt sicher, dass fetchall() explizit aufgerufen wird und die Ergebnisliste nicht leer ist. """ db = tmp_path / "test2.db" conn = sqlite3.connect(str(db)) conn.execute("CREATE TABLE jobs (id TEXT, drucksache TEXT, status TEXT)") conn.commit() cursor = conn.execute("PRAGMA table_info(jobs)") rows = cursor.fetchall() # Fix: fetchall() vor Iteration cols = {r[1] for r in rows} conn.close() assert len(cols) == 3 assert "drucksache" in cols def test_init_db_does_not_crash_on_existing_db(self, tmp_path, monkeypatch): """init_db() läuft auf einer bereits initialisierten DB durch (kein PRAGMA-Crash).""" from app import config monkeypatch.setattr(config.settings, "db_path", tmp_path / "gwoe.db") from app.database import init_db asyncio.get_event_loop().run_until_complete(init_db()) # Zweiter Aufruf — alle ALTER-TABLE-Checks laufen durch asyncio.get_event_loop().run_until_complete(init_db()) # =========================================================================== # Bug 2 — JWT azp statt aud (Commit 49c1b92) # =========================================================================== # Keycloak setzt bei Public Clients aud="account", nicht den client_id. # Der Fix: verify_aud=False, stattdessen payload["azp"] == client_id prüfen. class TestJwtAzpCheck: """_validate_token prüft azp, nicht aud.""" def _run(self, coro): return asyncio.get_event_loop().run_until_complete(coro) def _patch_jose(self, payload: dict): """Stub jose.jwt so it returns the given payload on decode().""" jose_mod = types.ModuleType("jose") jose_jwt = types.ModuleType("jose.jwt") jose_jwt.get_unverified_header = lambda t: {"kid": "test-kid", "alg": "RS256"} jose_jwt.decode = lambda token, key, **kw: payload jose_mod.jwt = jose_jwt jose_mod.JWTError = Exception jose_mod.ExpiredSignatureError = type("ExpiredSignatureError", (Exception,), {}) sys.modules["jose"] = jose_mod sys.modules["jose.jwt"] = jose_jwt return jose_mod def test_valid_azp_returns_user(self, monkeypatch): """Wenn azp == client_id, gibt _validate_token ein User-Dict zurück.""" from app import config monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test") monkeypatch.setattr(config.settings, "keycloak_realm", "realm") monkeypatch.setattr(config.settings, "keycloak_client_id", "my-client") payload = { "sub": "user-123", "email": "user@test.de", "preferred_username": "testuser", "azp": "my-client", # korrekt "aud": "account", # Public Client — nicht unser client_id "realm_access": {"roles": []}, } self._patch_jose(payload) jwks = {"keys": [{"kid": "test-kid", "kty": "RSA"}]} from app.auth import _validate_token import app.auth as auth_mod # Stub _get_jwks async def _fake_jwks(): return jwks monkeypatch.setattr(auth_mod, "_get_jwks", _fake_jwks) result = self._run(_validate_token("fake.jwt.token")) assert result is not None assert result["sub"] == "user-123" def test_wrong_azp_returns_none(self, monkeypatch): """Wenn azp != client_id, gibt _validate_token None zurück (auch wenn aud passt).""" from app import config monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test") monkeypatch.setattr(config.settings, "keycloak_realm", "realm") monkeypatch.setattr(config.settings, "keycloak_client_id", "my-client") payload = { "sub": "attacker-999", "azp": "other-client", # falscher azp "aud": "my-client", # aud passt zufällig — darf NICHT reichen "realm_access": {"roles": []}, } self._patch_jose(payload) jwks = {"keys": [{"kid": "test-kid"}]} from app.auth import _validate_token import app.auth as auth_mod async def _fake_jwks(): return jwks monkeypatch.setattr(auth_mod, "_get_jwks", _fake_jwks) result = self._run(_validate_token("fake.jwt.token")) assert result is None, "azp-Mismatch muss zu None führen" def test_verify_aud_is_disabled(self): """Die Source von _validate_token muss options={'verify_aud': False} enthalten.""" import inspect from app.auth import _validate_token source = inspect.getsource(_validate_token) assert "verify_aud" in source and "False" in source, ( "_validate_token muss verify_aud=False in den JWT-decode-Options setzen" ) def test_azp_field_checked_not_aud(self): """Die Source von _validate_token muss explizit 'azp' prüfen, nicht 'aud'.""" import inspect from app.auth import _validate_token source = inspect.getsource(_validate_token) assert 'payload.get("azp")' in source or "payload['azp']" in source, ( "_validate_token muss payload['azp'] gegen client_id prüfen" ) # =========================================================================== # Bug 3 — CDU-PDF AssertionError Fallback (Commit 1414057) # =========================================================================== # render_highlighted_page() von embeddings.py wirft bei manchen CDU-PDFs einen # AssertionError in PyMuPDF tobytes(). Der Fix fängt (AssertionError, Exception) # und gibt die ursprüngliche PDF-Datei zurück. class TestCduPdfAssertionFallback: def test_tobytes_assertionerror_returns_original_pdf(self, tmp_path, monkeypatch): """Wenn tobytes() AssertionError wirft, liefert render_highlighted_page das Original-PDF.""" import app.embeddings as emb_mod # Minimales Fake-PDF (realer Inhalt nicht nötig, nur Bytes) fake_pdf_bytes = b"%PDF-1.4 fakecontent" pdf_path = tmp_path / "cdu-test.pdf" pdf_path.write_bytes(fake_pdf_bytes) # Stub PROGRAMME registry monkeypatch.setattr( emb_mod, "PROGRAMME", {"cdu-test": {"name": "CDU Test", "typ": "wahlprogramm", "partei": "CDU", "pdf": "cdu-test.pdf"}}, ) class FakePage: def search_for(self, needle): return [] def add_highlight_annot(self, rect): return None class FakeDoc: def __init__(self, *a, **kw): self._pages = [FakePage()] def __len__(self): return 1 def __getitem__(self, idx): return self._pages[idx] def tobytes(self): raise AssertionError("CDU-PDF kaputt — AssertionError aus PyMuPDF") def close(self): pass # fitz is a thin wrapper around pymupdf; patch the fitz.open used inside embeddings.py import fitz import pymupdf # Patch both possible references monkeypatch.setattr(fitz, "open", FakeDoc, raising=False) monkeypatch.setattr(pymupdf, "open", FakeDoc, raising=False) # Redirect referenzen-Pfad zu tmp_path from pathlib import Path as _Path original_truediv = _Path.__truediv__ def _redirect_truediv(self, other): result = original_truediv(self, other) if "referenzen" in str(result) and str(other) == "cdu-test.pdf": return pdf_path if "referenzen" in str(result): return tmp_path return result monkeypatch.setattr(_Path, "__truediv__", _redirect_truediv) pdf_bytes, found_page, highlighted = emb_mod.render_highlighted_page( "cdu-test", seite=1, query="Wirtschaft" ) assert pdf_bytes == fake_pdf_bytes, "Fallback muss Original-PDF-Bytes zurückgeben" assert highlighted is False def test_assertion_error_fallback_present_in_source(self): """render_highlighted_page muss AssertionError in einem try/except fangen.""" import inspect import app.embeddings as emb_mod source = inspect.getsource(emb_mod.render_highlighted_page) assert "AssertionError" in source, ( "render_highlighted_page muss AssertionError explizit fangen (CDU-PDF-Fallback)" ) # =========================================================================== # Bug 4 — PFLICHT-FRAKTIONEN = alle LT-Fraktionen (Commit 5ea507b) # =========================================================================== # Vor dem Fix wurden nur Antragsteller + Regierungsfraktionen als PFLICHT- # FRAKTIONEN ausgegeben. Der Fix gibt alle landtagsfraktionen aus. class TestPflichtFraktionen: def _build_user_prompt(self, bundesland: str = "NRW") -> str: """Baut den user_prompt wie in analyzer.py — minimal, ohne LLM-Call.""" from app.bundeslaender import BUNDESLAENDER from app.analyzer import get_bundesland_context bl = BUNDESLAENDER[bundesland] pflicht = ", ".join(bl.landtagsfraktionen) return ( f"**PFLICHT-FRAKTIONEN:** Du MUSST ALLE folgenden Fraktionen der " f"aktuellen Wahlperiode in `wahlprogrammScores` bewerten — keine auslassen:\n" f"{pflicht}" ) def test_afd_in_pflicht_fraktionen_nrw(self): """AfD muss in PFLICHT-FRAKTIONEN für NRW stehen, auch ohne Antragsteller.""" prompt = self._build_user_prompt("NRW") assert "AfD" in prompt, "AfD fehlt in PFLICHT-FRAKTIONEN (NRW)" def test_all_nrw_fraktionen_in_prompt(self): """Alle NRW-Landtagsfraktionen müssen im PFLICHT-Block stehen.""" from app.bundeslaender import BUNDESLAENDER prompt = self._build_user_prompt("NRW") for fraktion in BUNDESLAENDER["NRW"].landtagsfraktionen: assert fraktion in prompt, f"Fraktion {fraktion!r} fehlt im PFLICHT-Block" def test_analyzer_user_prompt_contains_all_fraktionen(self, monkeypatch): """analyze_antrag baut einen user_prompt mit allen LT-Fraktionen als PFLICHT.""" # Nach ADR 0008: Wir reichen einen FakeLlmBewerter statt den # AsyncOpenAI-Client zu monkeypatchen. Der Fake captured den # user_prompt aus dem ``LlmRequest`` und liefert ein minimales # gültiges Assessment-Dict zurück. import app.analyzer as analyzer_mod from app.bundeslaender import BUNDESLAENDER captured_prompts: list[str] = [] class FakeBewerter: async def bewerte(self, request): captured_prompts.append(request.user_prompt) return { "drucksache": "18/1", "title": "Test", "fraktionen": ["SPD"], "datum": "2024-01-01", "link": None, "gwoeScore": 5, "gwoeBegründung": "Test", "gwoeMatrix": [], "gwoeSchwerpunkt": [], "wahlprogrammScores": [], "verbesserungen": [], "stärken": [], "schwächen": [], "empfehlung": "Überarbeiten", "empfehlungSymbol": "[!]", "verbesserungspotenzial": "mittel", "themen": [], "antragZusammenfassung": "Test", "antragKernpunkte": [], "konfidenz": "mittel", "shareThreads": "", "shareTwitter": "", "shareMastodon": "", } import app.embeddings as emb_mod monkeypatch.setattr(emb_mod, "EMBEDDINGS_DB", type("P", (), {"exists": lambda self: False})()) asyncio.get_event_loop().run_until_complete( analyzer_mod.analyze_antrag( text="Der SPD-Antrag fordert mehr Klimaschutz in NRW.", bundesland="NRW", model="qwen-plus", bewerter=FakeBewerter(), ) ) assert captured_prompts, "user_prompt muss gebaut worden sein" prompt = captured_prompts[0] # AfD ist keine Regierungsfraktion in NRW — muss aber trotzdem stehen assert "AfD" in prompt, "AfD fehlt im user_prompt (PFLICHT-FRAKTIONEN-Bug)" # Alle NRW-Fraktionen prüfen for fraktion in BUNDESLAENDER["NRW"].landtagsfraktionen: assert fraktion in prompt, f"Fraktion {fraktion!r} fehlt im user_prompt" # =========================================================================== # Bug 5 — NRW-Titel + Regierungsfraktionen im LLM-Prompt (Commit 038ebd6) # =========================================================================== # get_bundesland_context() muss den Parlamentsnamen und die Regierungsfraktionen # korrekt im Context-String ausgeben. class TestNrwTitelRegierungsfraktionen: def test_bundesland_context_contains_regierungsfraktionen(self): """get_bundesland_context gibt für NRW die aktuellen Regierungsfraktionen aus.""" from app.analyzer import get_bundesland_context from app.bundeslaender import BUNDESLAENDER ctx = get_bundesland_context("NRW") for regfrak in BUNDESLAENDER["NRW"].regierungsfraktionen: assert regfrak in ctx, ( f"Regierungsfraktion {regfrak!r} fehlt im Bundesland-Context für NRW" ) def test_bundesland_context_contains_parliament_name(self): """get_bundesland_context gibt den Parlamentsnamen aus.""" from app.analyzer import get_bundesland_context from app.bundeslaender import BUNDESLAENDER ctx = get_bundesland_context("NRW") parliament = BUNDESLAENDER["NRW"].parlament_name assert parliament in ctx, ( f"Parlamentsname {parliament!r} fehlt im Context-String" ) def test_bundesland_context_contains_landtagsfraktionen(self): """get_bundesland_context listet alle LT-Fraktionen auf.""" from app.analyzer import get_bundesland_context from app.bundeslaender import BUNDESLAENDER ctx = get_bundesland_context("NRW") for fraktion in BUNDESLAENDER["NRW"].landtagsfraktionen: assert fraktion in ctx, ( f"Landtagsfraktion {fraktion!r} fehlt im Bundesland-Context" ) def test_regierungsfraktionen_label_present_in_context(self): """Der Context-String enthält den Label 'Regierungsfraktionen'.""" from app.analyzer import get_bundesland_context ctx = get_bundesland_context("NRW") assert "Regierungsfraktionen" in ctx