gwoe-antragspruefer/tests/test_bug_regressions.py

409 lines
17 KiB
Python
Raw Permalink Normal View History

"""Bug-Regression-Tests für fünf Fix-Commits ohne bisherige Test-Coverage.
Je ein Mini-Integration-Test anchored am konkreten Fehler:
3e71547 PRAGMA cursor fetchall() vor iteration in SQLite
49c1b92 JWT azp statt aud bei Keycloak Public Clients
1414057 CDU-PDF AssertionError Fallback in render_highlighted_page
5ea507b PFLICHT-FRAKTIONEN = alle LT-Fraktionen
038ebd6 NRW-Titel + Regierungsfraktionen-Pflicht im LLM-Prompt
"""
import asyncio
import json
import sqlite3
import sys
import types
import pytest
# ---------------------------------------------------------------------------
# Stubs für externe Deps, die in der lokalen Dev-Umgebung nicht vollständig
# installiert sind (openai AsyncOpenAI, aiosmtplib, etc.)
# ---------------------------------------------------------------------------
if "openai" not in sys.modules or not hasattr(sys.modules.get("openai"), "AsyncOpenAI"):
openai_stub = types.ModuleType("openai")
openai_stub.OpenAI = lambda **kw: None
openai_stub.AsyncOpenAI = lambda **kw: None
sys.modules["openai"] = openai_stub
# ===========================================================================
# Bug 1 — PRAGMA cursor fetchall() vor Iteration (Commit 3e71547)
# ===========================================================================
# Vor dem Fix wurde `conn.execute("PRAGMA table_info(...)")` direkt iteriert,
# ohne fetchall() aufzurufen. Bei aiosqlite führt das zu einer Exception oder
# leeren Ergebnissen. Der Fix: `cursor.fetchall()` vor dem Set-Comprehension.
# Regression: init_db() muss die PRAGMA-Rows korrekt auslesen — fehlt die
# Spalte, wird ein ALTER TABLE versucht; ist sie da, wird nichts gemacht.
class TestPragmaCursorFetchall:
def test_table_info_fetchall_reads_columns(self, tmp_path):
"""PRAGMA table_info liefert Spaltennamen korrekt via fetchall()."""
db = tmp_path / "test.db"
conn = sqlite3.connect(str(db))
conn.execute("CREATE TABLE assessments (drucksache TEXT, konfidenz TEXT, summary_embedding BLOB)")
conn.commit()
# Exakt wie in database.py nach dem Fix: execute().fetchall()
cursor = conn.execute("PRAGMA table_info(assessments)")
cols = {r[1] for r in cursor.fetchall()}
conn.close()
assert "konfidenz" in cols
assert "summary_embedding" in cols
assert "drucksache" in cols
def test_fetchall_before_set_comprehension_no_crash(self, tmp_path):
"""Direktes Iterieren über cursor ohne fetchall() — Regression-Guard.
Früher wurde `for r in cursor` statt `for r in cursor.fetchall()` verwendet.
Der Test stellt sicher, dass fetchall() explizit aufgerufen wird und
die Ergebnisliste nicht leer ist.
"""
db = tmp_path / "test2.db"
conn = sqlite3.connect(str(db))
conn.execute("CREATE TABLE jobs (id TEXT, drucksache TEXT, status TEXT)")
conn.commit()
cursor = conn.execute("PRAGMA table_info(jobs)")
rows = cursor.fetchall() # Fix: fetchall() vor Iteration
cols = {r[1] for r in rows}
conn.close()
assert len(cols) == 3
assert "drucksache" in cols
def test_init_db_does_not_crash_on_existing_db(self, tmp_path, monkeypatch):
"""init_db() läuft auf einer bereits initialisierten DB durch (kein PRAGMA-Crash)."""
from app import config
monkeypatch.setattr(config.settings, "db_path", tmp_path / "gwoe.db")
from app.database import init_db
asyncio.get_event_loop().run_until_complete(init_db())
# Zweiter Aufruf — alle ALTER-TABLE-Checks laufen durch
asyncio.get_event_loop().run_until_complete(init_db())
# ===========================================================================
# Bug 2 — JWT azp statt aud (Commit 49c1b92)
# ===========================================================================
# Keycloak setzt bei Public Clients aud="account", nicht den client_id.
# Der Fix: verify_aud=False, stattdessen payload["azp"] == client_id prüfen.
class TestJwtAzpCheck:
"""_validate_token prüft azp, nicht aud."""
def _run(self, coro):
return asyncio.get_event_loop().run_until_complete(coro)
def _patch_jose(self, payload: dict):
"""Stub jose.jwt so it returns the given payload on decode()."""
jose_mod = types.ModuleType("jose")
jose_jwt = types.ModuleType("jose.jwt")
jose_jwt.get_unverified_header = lambda t: {"kid": "test-kid", "alg": "RS256"}
jose_jwt.decode = lambda token, key, **kw: payload
jose_mod.jwt = jose_jwt
jose_mod.JWTError = Exception
jose_mod.ExpiredSignatureError = type("ExpiredSignatureError", (Exception,), {})
sys.modules["jose"] = jose_mod
sys.modules["jose.jwt"] = jose_jwt
return jose_mod
def test_valid_azp_returns_user(self, monkeypatch):
"""Wenn azp == client_id, gibt _validate_token ein User-Dict zurück."""
from app import config
monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test")
monkeypatch.setattr(config.settings, "keycloak_realm", "realm")
monkeypatch.setattr(config.settings, "keycloak_client_id", "my-client")
payload = {
"sub": "user-123",
"email": "user@test.de",
"preferred_username": "testuser",
"azp": "my-client", # korrekt
"aud": "account", # Public Client — nicht unser client_id
"realm_access": {"roles": []},
}
self._patch_jose(payload)
jwks = {"keys": [{"kid": "test-kid", "kty": "RSA"}]}
from app.auth import _validate_token
import app.auth as auth_mod
# Stub _get_jwks
async def _fake_jwks():
return jwks
monkeypatch.setattr(auth_mod, "_get_jwks", _fake_jwks)
result = self._run(_validate_token("fake.jwt.token"))
assert result is not None
assert result["sub"] == "user-123"
def test_wrong_azp_returns_none(self, monkeypatch):
"""Wenn azp != client_id, gibt _validate_token None zurück (auch wenn aud passt)."""
from app import config
monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test")
monkeypatch.setattr(config.settings, "keycloak_realm", "realm")
monkeypatch.setattr(config.settings, "keycloak_client_id", "my-client")
payload = {
"sub": "attacker-999",
"azp": "other-client", # falscher azp
"aud": "my-client", # aud passt zufällig — darf NICHT reichen
"realm_access": {"roles": []},
}
self._patch_jose(payload)
jwks = {"keys": [{"kid": "test-kid"}]}
from app.auth import _validate_token
import app.auth as auth_mod
async def _fake_jwks():
return jwks
monkeypatch.setattr(auth_mod, "_get_jwks", _fake_jwks)
result = self._run(_validate_token("fake.jwt.token"))
assert result is None, "azp-Mismatch muss zu None führen"
def test_verify_aud_is_disabled(self):
"""Die Source von _validate_token muss options={'verify_aud': False} enthalten."""
import inspect
from app.auth import _validate_token
source = inspect.getsource(_validate_token)
assert "verify_aud" in source and "False" in source, (
"_validate_token muss verify_aud=False in den JWT-decode-Options setzen"
)
def test_azp_field_checked_not_aud(self):
"""Die Source von _validate_token muss explizit 'azp' prüfen, nicht 'aud'."""
import inspect
from app.auth import _validate_token
source = inspect.getsource(_validate_token)
assert 'payload.get("azp")' in source or "payload['azp']" in source, (
"_validate_token muss payload['azp'] gegen client_id prüfen"
)
# ===========================================================================
# Bug 3 — CDU-PDF AssertionError Fallback (Commit 1414057)
# ===========================================================================
# render_highlighted_page() von embeddings.py wirft bei manchen CDU-PDFs einen
# AssertionError in PyMuPDF tobytes(). Der Fix fängt (AssertionError, Exception)
# und gibt die ursprüngliche PDF-Datei zurück.
class TestCduPdfAssertionFallback:
def test_tobytes_assertionerror_returns_original_pdf(self, tmp_path, monkeypatch):
"""Wenn tobytes() AssertionError wirft, liefert render_highlighted_page das Original-PDF."""
import app.embeddings as emb_mod
# Minimales Fake-PDF (realer Inhalt nicht nötig, nur Bytes)
fake_pdf_bytes = b"%PDF-1.4 fakecontent"
pdf_path = tmp_path / "cdu-test.pdf"
pdf_path.write_bytes(fake_pdf_bytes)
# Stub PROGRAMME registry
monkeypatch.setattr(
emb_mod, "PROGRAMME",
{"cdu-test": {"name": "CDU Test", "typ": "wahlprogramm",
"partei": "CDU", "pdf": "cdu-test.pdf"}},
)
class FakePage:
def search_for(self, needle):
return []
def add_highlight_annot(self, rect):
return None
class FakeDoc:
def __init__(self, *a, **kw):
self._pages = [FakePage()]
def __len__(self):
return 1
def __getitem__(self, idx):
return self._pages[idx]
def tobytes(self):
raise AssertionError("CDU-PDF kaputt — AssertionError aus PyMuPDF")
def close(self):
pass
# fitz is a thin wrapper around pymupdf; patch the fitz.open used inside embeddings.py
import fitz
import pymupdf
# Patch both possible references
monkeypatch.setattr(fitz, "open", FakeDoc, raising=False)
monkeypatch.setattr(pymupdf, "open", FakeDoc, raising=False)
# Redirect referenzen-Pfad zu tmp_path
from pathlib import Path as _Path
original_truediv = _Path.__truediv__
def _redirect_truediv(self, other):
result = original_truediv(self, other)
if "referenzen" in str(result) and str(other) == "cdu-test.pdf":
return pdf_path
if "referenzen" in str(result):
return tmp_path
return result
monkeypatch.setattr(_Path, "__truediv__", _redirect_truediv)
pdf_bytes, found_page, highlighted = emb_mod.render_highlighted_page(
"cdu-test", seite=1, query="Wirtschaft"
)
assert pdf_bytes == fake_pdf_bytes, "Fallback muss Original-PDF-Bytes zurückgeben"
assert highlighted is False
def test_assertion_error_fallback_present_in_source(self):
"""render_highlighted_page muss AssertionError in einem try/except fangen."""
import inspect
import app.embeddings as emb_mod
source = inspect.getsource(emb_mod.render_highlighted_page)
assert "AssertionError" in source, (
"render_highlighted_page muss AssertionError explizit fangen (CDU-PDF-Fallback)"
)
# ===========================================================================
# Bug 4 — PFLICHT-FRAKTIONEN = alle LT-Fraktionen (Commit 5ea507b)
# ===========================================================================
# Vor dem Fix wurden nur Antragsteller + Regierungsfraktionen als PFLICHT-
# FRAKTIONEN ausgegeben. Der Fix gibt alle landtagsfraktionen aus.
class TestPflichtFraktionen:
def _build_user_prompt(self, bundesland: str = "NRW") -> str:
"""Baut den user_prompt wie in analyzer.py — minimal, ohne LLM-Call."""
from app.bundeslaender import BUNDESLAENDER
from app.analyzer import get_bundesland_context
bl = BUNDESLAENDER[bundesland]
pflicht = ", ".join(bl.landtagsfraktionen)
return (
f"**PFLICHT-FRAKTIONEN:** Du MUSST ALLE folgenden Fraktionen der "
f"aktuellen Wahlperiode in `wahlprogrammScores` bewerten — keine auslassen:\n"
f"{pflicht}"
)
def test_afd_in_pflicht_fraktionen_nrw(self):
"""AfD muss in PFLICHT-FRAKTIONEN für NRW stehen, auch ohne Antragsteller."""
prompt = self._build_user_prompt("NRW")
assert "AfD" in prompt, "AfD fehlt in PFLICHT-FRAKTIONEN (NRW)"
def test_all_nrw_fraktionen_in_prompt(self):
"""Alle NRW-Landtagsfraktionen müssen im PFLICHT-Block stehen."""
from app.bundeslaender import BUNDESLAENDER
prompt = self._build_user_prompt("NRW")
for fraktion in BUNDESLAENDER["NRW"].landtagsfraktionen:
assert fraktion in prompt, f"Fraktion {fraktion!r} fehlt im PFLICHT-Block"
def test_analyzer_user_prompt_contains_all_fraktionen(self, monkeypatch):
"""analyze_antrag baut einen user_prompt mit allen LT-Fraktionen als PFLICHT."""
# Nach ADR 0008: Wir reichen einen FakeLlmBewerter statt den
# AsyncOpenAI-Client zu monkeypatchen. Der Fake captured den
# user_prompt aus dem ``LlmRequest`` und liefert ein minimales
# gültiges Assessment-Dict zurück.
import app.analyzer as analyzer_mod
from app.bundeslaender import BUNDESLAENDER
captured_prompts: list[str] = []
class FakeBewerter:
async def bewerte(self, request):
captured_prompts.append(request.user_prompt)
return {
"drucksache": "18/1",
"title": "Test",
"fraktionen": ["SPD"],
"datum": "2024-01-01",
"link": None,
"gwoeScore": 5,
"gwoeBegründung": "Test",
"gwoeMatrix": [],
"gwoeSchwerpunkt": [],
"wahlprogrammScores": [],
"verbesserungen": [],
"stärken": [],
"schwächen": [],
"empfehlung": "Überarbeiten",
"empfehlungSymbol": "[!]",
"verbesserungspotenzial": "mittel",
"themen": [],
"antragZusammenfassung": "Test",
"antragKernpunkte": [],
"konfidenz": "mittel",
"shareThreads": "",
"shareTwitter": "",
"shareMastodon": "",
}
import app.embeddings as emb_mod
monkeypatch.setattr(emb_mod, "EMBEDDINGS_DB", type("P", (), {"exists": lambda self: False})())
asyncio.get_event_loop().run_until_complete(
analyzer_mod.analyze_antrag(
text="Der SPD-Antrag fordert mehr Klimaschutz in NRW.",
bundesland="NRW",
model="qwen-plus",
bewerter=FakeBewerter(),
)
)
assert captured_prompts, "user_prompt muss gebaut worden sein"
prompt = captured_prompts[0]
# AfD ist keine Regierungsfraktion in NRW — muss aber trotzdem stehen
assert "AfD" in prompt, "AfD fehlt im user_prompt (PFLICHT-FRAKTIONEN-Bug)"
# Alle NRW-Fraktionen prüfen
for fraktion in BUNDESLAENDER["NRW"].landtagsfraktionen:
assert fraktion in prompt, f"Fraktion {fraktion!r} fehlt im user_prompt"
# ===========================================================================
# Bug 5 — NRW-Titel + Regierungsfraktionen im LLM-Prompt (Commit 038ebd6)
# ===========================================================================
# get_bundesland_context() muss den Parlamentsnamen und die Regierungsfraktionen
# korrekt im Context-String ausgeben.
class TestNrwTitelRegierungsfraktionen:
def test_bundesland_context_contains_regierungsfraktionen(self):
"""get_bundesland_context gibt für NRW die aktuellen Regierungsfraktionen aus."""
from app.analyzer import get_bundesland_context
from app.bundeslaender import BUNDESLAENDER
ctx = get_bundesland_context("NRW")
for regfrak in BUNDESLAENDER["NRW"].regierungsfraktionen:
assert regfrak in ctx, (
f"Regierungsfraktion {regfrak!r} fehlt im Bundesland-Context für NRW"
)
def test_bundesland_context_contains_parliament_name(self):
"""get_bundesland_context gibt den Parlamentsnamen aus."""
from app.analyzer import get_bundesland_context
from app.bundeslaender import BUNDESLAENDER
ctx = get_bundesland_context("NRW")
parliament = BUNDESLAENDER["NRW"].parlament_name
assert parliament in ctx, (
f"Parlamentsname {parliament!r} fehlt im Context-String"
)
def test_bundesland_context_contains_landtagsfraktionen(self):
"""get_bundesland_context listet alle LT-Fraktionen auf."""
from app.analyzer import get_bundesland_context
from app.bundeslaender import BUNDESLAENDER
ctx = get_bundesland_context("NRW")
for fraktion in BUNDESLAENDER["NRW"].landtagsfraktionen:
assert fraktion in ctx, (
f"Landtagsfraktion {fraktion!r} fehlt im Bundesland-Context"
)
def test_regierungsfraktionen_label_present_in_context(self):
"""Der Context-String enthält den Label 'Regierungsfraktionen'."""
from app.analyzer import get_bundesland_context
ctx = get_bundesland_context("NRW")
assert "Regierungsfraktionen" in ctx