Neue Tests in dieser Migration: - test_database.py (Merkliste-CRUD, Subscriptions, abgeordnetenwatch-Joins) - test_clustering.py (82% Coverage) - test_drucksache_typen.py (100%) - test_mail.py (86%) - test_monitoring.py (23 Tests) - test_abgeordnetenwatch.py (23 Tests, inkl. Drucksache-Extraction) - test_redline_parser.py (20 Tests fuer §INS§/§DEL§-Marker) - test_bug_regressions.py (PRAGMA, JWT-azp, CDU-PDF, PFLICHT-FRAKTIONEN, NRW-Titel) - test_embeddings_v3_v4.py (WRITE/READ-Pattern) - test_wahlprogramm_check.py (#128) - test_wahlprogramm_fetch.py (#138) - test_antrag/bewertung/abonnement_repository.py + test_llm_bewerter.py (DDD) - test_domain_behavior.py (5 Domain-Methoden boundary tests) - tests/e2e/test_ui.py (Playwright) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
409 lines
17 KiB
Python
409 lines
17 KiB
Python
"""Bug-Regression-Tests für fünf Fix-Commits ohne bisherige Test-Coverage.
|
|
|
|
Je ein Mini-Integration-Test anchored am konkreten Fehler:
|
|
3e71547 — PRAGMA cursor fetchall() vor iteration in SQLite
|
|
49c1b92 — JWT azp statt aud bei Keycloak Public Clients
|
|
1414057 — CDU-PDF AssertionError Fallback in render_highlighted_page
|
|
5ea507b — PFLICHT-FRAKTIONEN = alle LT-Fraktionen
|
|
038ebd6 — NRW-Titel + Regierungsfraktionen-Pflicht im LLM-Prompt
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import sqlite3
|
|
import sys
|
|
import types
|
|
|
|
import pytest
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stubs für externe Deps, die in der lokalen Dev-Umgebung nicht vollständig
|
|
# installiert sind (openai AsyncOpenAI, aiosmtplib, etc.)
|
|
# ---------------------------------------------------------------------------
|
|
if "openai" not in sys.modules or not hasattr(sys.modules.get("openai"), "AsyncOpenAI"):
|
|
openai_stub = types.ModuleType("openai")
|
|
openai_stub.OpenAI = lambda **kw: None
|
|
openai_stub.AsyncOpenAI = lambda **kw: None
|
|
sys.modules["openai"] = openai_stub
|
|
|
|
|
|
# ===========================================================================
|
|
# Bug 1 — PRAGMA cursor fetchall() vor Iteration (Commit 3e71547)
|
|
# ===========================================================================
|
|
# Vor dem Fix wurde `conn.execute("PRAGMA table_info(...)")` direkt iteriert,
|
|
# ohne fetchall() aufzurufen. Bei aiosqlite führt das zu einer Exception oder
|
|
# leeren Ergebnissen. Der Fix: `cursor.fetchall()` vor dem Set-Comprehension.
|
|
# Regression: init_db() muss die PRAGMA-Rows korrekt auslesen — fehlt die
|
|
# Spalte, wird ein ALTER TABLE versucht; ist sie da, wird nichts gemacht.
|
|
|
|
class TestPragmaCursorFetchall:
|
|
def test_table_info_fetchall_reads_columns(self, tmp_path):
|
|
"""PRAGMA table_info liefert Spaltennamen korrekt via fetchall()."""
|
|
db = tmp_path / "test.db"
|
|
conn = sqlite3.connect(str(db))
|
|
conn.execute("CREATE TABLE assessments (drucksache TEXT, konfidenz TEXT, summary_embedding BLOB)")
|
|
conn.commit()
|
|
|
|
# Exakt wie in database.py nach dem Fix: execute().fetchall()
|
|
cursor = conn.execute("PRAGMA table_info(assessments)")
|
|
cols = {r[1] for r in cursor.fetchall()}
|
|
conn.close()
|
|
|
|
assert "konfidenz" in cols
|
|
assert "summary_embedding" in cols
|
|
assert "drucksache" in cols
|
|
|
|
def test_fetchall_before_set_comprehension_no_crash(self, tmp_path):
|
|
"""Direktes Iterieren über cursor ohne fetchall() — Regression-Guard.
|
|
|
|
Früher wurde `for r in cursor` statt `for r in cursor.fetchall()` verwendet.
|
|
Der Test stellt sicher, dass fetchall() explizit aufgerufen wird und
|
|
die Ergebnisliste nicht leer ist.
|
|
"""
|
|
db = tmp_path / "test2.db"
|
|
conn = sqlite3.connect(str(db))
|
|
conn.execute("CREATE TABLE jobs (id TEXT, drucksache TEXT, status TEXT)")
|
|
conn.commit()
|
|
|
|
cursor = conn.execute("PRAGMA table_info(jobs)")
|
|
rows = cursor.fetchall() # Fix: fetchall() vor Iteration
|
|
cols = {r[1] for r in rows}
|
|
conn.close()
|
|
|
|
assert len(cols) == 3
|
|
assert "drucksache" in cols
|
|
|
|
def test_init_db_does_not_crash_on_existing_db(self, tmp_path, monkeypatch):
|
|
"""init_db() läuft auf einer bereits initialisierten DB durch (kein PRAGMA-Crash)."""
|
|
from app import config
|
|
monkeypatch.setattr(config.settings, "db_path", tmp_path / "gwoe.db")
|
|
|
|
from app.database import init_db
|
|
asyncio.get_event_loop().run_until_complete(init_db())
|
|
# Zweiter Aufruf — alle ALTER-TABLE-Checks laufen durch
|
|
asyncio.get_event_loop().run_until_complete(init_db())
|
|
|
|
|
|
# ===========================================================================
|
|
# Bug 2 — JWT azp statt aud (Commit 49c1b92)
|
|
# ===========================================================================
|
|
# Keycloak setzt bei Public Clients aud="account", nicht den client_id.
|
|
# Der Fix: verify_aud=False, stattdessen payload["azp"] == client_id prüfen.
|
|
|
|
class TestJwtAzpCheck:
|
|
"""_validate_token prüft azp, nicht aud."""
|
|
|
|
def _run(self, coro):
|
|
return asyncio.get_event_loop().run_until_complete(coro)
|
|
|
|
def _patch_jose(self, payload: dict):
|
|
"""Stub jose.jwt so it returns the given payload on decode()."""
|
|
jose_mod = types.ModuleType("jose")
|
|
jose_jwt = types.ModuleType("jose.jwt")
|
|
jose_jwt.get_unverified_header = lambda t: {"kid": "test-kid", "alg": "RS256"}
|
|
jose_jwt.decode = lambda token, key, **kw: payload
|
|
jose_mod.jwt = jose_jwt
|
|
jose_mod.JWTError = Exception
|
|
jose_mod.ExpiredSignatureError = type("ExpiredSignatureError", (Exception,), {})
|
|
sys.modules["jose"] = jose_mod
|
|
sys.modules["jose.jwt"] = jose_jwt
|
|
return jose_mod
|
|
|
|
def test_valid_azp_returns_user(self, monkeypatch):
|
|
"""Wenn azp == client_id, gibt _validate_token ein User-Dict zurück."""
|
|
from app import config
|
|
monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test")
|
|
monkeypatch.setattr(config.settings, "keycloak_realm", "realm")
|
|
monkeypatch.setattr(config.settings, "keycloak_client_id", "my-client")
|
|
|
|
payload = {
|
|
"sub": "user-123",
|
|
"email": "user@test.de",
|
|
"preferred_username": "testuser",
|
|
"azp": "my-client", # korrekt
|
|
"aud": "account", # Public Client — nicht unser client_id
|
|
"realm_access": {"roles": []},
|
|
}
|
|
self._patch_jose(payload)
|
|
|
|
jwks = {"keys": [{"kid": "test-kid", "kty": "RSA"}]}
|
|
|
|
from app.auth import _validate_token
|
|
import app.auth as auth_mod
|
|
# Stub _get_jwks
|
|
async def _fake_jwks():
|
|
return jwks
|
|
|
|
monkeypatch.setattr(auth_mod, "_get_jwks", _fake_jwks)
|
|
result = self._run(_validate_token("fake.jwt.token"))
|
|
|
|
assert result is not None
|
|
assert result["sub"] == "user-123"
|
|
|
|
def test_wrong_azp_returns_none(self, monkeypatch):
|
|
"""Wenn azp != client_id, gibt _validate_token None zurück (auch wenn aud passt)."""
|
|
from app import config
|
|
monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test")
|
|
monkeypatch.setattr(config.settings, "keycloak_realm", "realm")
|
|
monkeypatch.setattr(config.settings, "keycloak_client_id", "my-client")
|
|
|
|
payload = {
|
|
"sub": "attacker-999",
|
|
"azp": "other-client", # falscher azp
|
|
"aud": "my-client", # aud passt zufällig — darf NICHT reichen
|
|
"realm_access": {"roles": []},
|
|
}
|
|
self._patch_jose(payload)
|
|
|
|
jwks = {"keys": [{"kid": "test-kid"}]}
|
|
|
|
from app.auth import _validate_token
|
|
import app.auth as auth_mod
|
|
async def _fake_jwks():
|
|
return jwks
|
|
monkeypatch.setattr(auth_mod, "_get_jwks", _fake_jwks)
|
|
|
|
result = self._run(_validate_token("fake.jwt.token"))
|
|
assert result is None, "azp-Mismatch muss zu None führen"
|
|
|
|
def test_verify_aud_is_disabled(self):
|
|
"""Die Source von _validate_token muss options={'verify_aud': False} enthalten."""
|
|
import inspect
|
|
from app.auth import _validate_token
|
|
source = inspect.getsource(_validate_token)
|
|
assert "verify_aud" in source and "False" in source, (
|
|
"_validate_token muss verify_aud=False in den JWT-decode-Options setzen"
|
|
)
|
|
|
|
def test_azp_field_checked_not_aud(self):
|
|
"""Die Source von _validate_token muss explizit 'azp' prüfen, nicht 'aud'."""
|
|
import inspect
|
|
from app.auth import _validate_token
|
|
source = inspect.getsource(_validate_token)
|
|
assert 'payload.get("azp")' in source or "payload['azp']" in source, (
|
|
"_validate_token muss payload['azp'] gegen client_id prüfen"
|
|
)
|
|
|
|
|
|
# ===========================================================================
|
|
# Bug 3 — CDU-PDF AssertionError Fallback (Commit 1414057)
|
|
# ===========================================================================
|
|
# render_highlighted_page() von embeddings.py wirft bei manchen CDU-PDFs einen
|
|
# AssertionError in PyMuPDF tobytes(). Der Fix fängt (AssertionError, Exception)
|
|
# und gibt die ursprüngliche PDF-Datei zurück.
|
|
|
|
class TestCduPdfAssertionFallback:
|
|
def test_tobytes_assertionerror_returns_original_pdf(self, tmp_path, monkeypatch):
|
|
"""Wenn tobytes() AssertionError wirft, liefert render_highlighted_page das Original-PDF."""
|
|
import app.embeddings as emb_mod
|
|
|
|
# Minimales Fake-PDF (realer Inhalt nicht nötig, nur Bytes)
|
|
fake_pdf_bytes = b"%PDF-1.4 fakecontent"
|
|
pdf_path = tmp_path / "cdu-test.pdf"
|
|
pdf_path.write_bytes(fake_pdf_bytes)
|
|
|
|
# Stub PROGRAMME registry
|
|
monkeypatch.setattr(
|
|
emb_mod, "PROGRAMME",
|
|
{"cdu-test": {"name": "CDU Test", "typ": "wahlprogramm",
|
|
"partei": "CDU", "pdf": "cdu-test.pdf"}},
|
|
)
|
|
|
|
class FakePage:
|
|
def search_for(self, needle):
|
|
return []
|
|
def add_highlight_annot(self, rect):
|
|
return None
|
|
|
|
class FakeDoc:
|
|
def __init__(self, *a, **kw):
|
|
self._pages = [FakePage()]
|
|
def __len__(self):
|
|
return 1
|
|
def __getitem__(self, idx):
|
|
return self._pages[idx]
|
|
def tobytes(self):
|
|
raise AssertionError("CDU-PDF kaputt — AssertionError aus PyMuPDF")
|
|
def close(self):
|
|
pass
|
|
|
|
# fitz is a thin wrapper around pymupdf; patch the fitz.open used inside embeddings.py
|
|
import fitz
|
|
import pymupdf
|
|
# Patch both possible references
|
|
monkeypatch.setattr(fitz, "open", FakeDoc, raising=False)
|
|
monkeypatch.setattr(pymupdf, "open", FakeDoc, raising=False)
|
|
|
|
# Redirect referenzen-Pfad zu tmp_path
|
|
from pathlib import Path as _Path
|
|
|
|
original_truediv = _Path.__truediv__
|
|
|
|
def _redirect_truediv(self, other):
|
|
result = original_truediv(self, other)
|
|
if "referenzen" in str(result) and str(other) == "cdu-test.pdf":
|
|
return pdf_path
|
|
if "referenzen" in str(result):
|
|
return tmp_path
|
|
return result
|
|
|
|
monkeypatch.setattr(_Path, "__truediv__", _redirect_truediv)
|
|
|
|
pdf_bytes, found_page, highlighted = emb_mod.render_highlighted_page(
|
|
"cdu-test", seite=1, query="Wirtschaft"
|
|
)
|
|
|
|
assert pdf_bytes == fake_pdf_bytes, "Fallback muss Original-PDF-Bytes zurückgeben"
|
|
assert highlighted is False
|
|
|
|
def test_assertion_error_fallback_present_in_source(self):
|
|
"""render_highlighted_page muss AssertionError in einem try/except fangen."""
|
|
import inspect
|
|
import app.embeddings as emb_mod
|
|
source = inspect.getsource(emb_mod.render_highlighted_page)
|
|
assert "AssertionError" in source, (
|
|
"render_highlighted_page muss AssertionError explizit fangen (CDU-PDF-Fallback)"
|
|
)
|
|
|
|
|
|
# ===========================================================================
|
|
# Bug 4 — PFLICHT-FRAKTIONEN = alle LT-Fraktionen (Commit 5ea507b)
|
|
# ===========================================================================
|
|
# Vor dem Fix wurden nur Antragsteller + Regierungsfraktionen als PFLICHT-
|
|
# FRAKTIONEN ausgegeben. Der Fix gibt alle landtagsfraktionen aus.
|
|
|
|
class TestPflichtFraktionen:
|
|
def _build_user_prompt(self, bundesland: str = "NRW") -> str:
|
|
"""Baut den user_prompt wie in analyzer.py — minimal, ohne LLM-Call."""
|
|
from app.bundeslaender import BUNDESLAENDER
|
|
from app.analyzer import get_bundesland_context
|
|
|
|
bl = BUNDESLAENDER[bundesland]
|
|
pflicht = ", ".join(bl.landtagsfraktionen)
|
|
return (
|
|
f"**PFLICHT-FRAKTIONEN:** Du MUSST ALLE folgenden Fraktionen der "
|
|
f"aktuellen Wahlperiode in `wahlprogrammScores` bewerten — keine auslassen:\n"
|
|
f"{pflicht}"
|
|
)
|
|
|
|
def test_afd_in_pflicht_fraktionen_nrw(self):
|
|
"""AfD muss in PFLICHT-FRAKTIONEN für NRW stehen, auch ohne Antragsteller."""
|
|
prompt = self._build_user_prompt("NRW")
|
|
assert "AfD" in prompt, "AfD fehlt in PFLICHT-FRAKTIONEN (NRW)"
|
|
|
|
def test_all_nrw_fraktionen_in_prompt(self):
|
|
"""Alle NRW-Landtagsfraktionen müssen im PFLICHT-Block stehen."""
|
|
from app.bundeslaender import BUNDESLAENDER
|
|
prompt = self._build_user_prompt("NRW")
|
|
for fraktion in BUNDESLAENDER["NRW"].landtagsfraktionen:
|
|
assert fraktion in prompt, f"Fraktion {fraktion!r} fehlt im PFLICHT-Block"
|
|
|
|
def test_analyzer_user_prompt_contains_all_fraktionen(self, monkeypatch):
|
|
"""analyze_antrag baut einen user_prompt mit allen LT-Fraktionen als PFLICHT."""
|
|
# Nach ADR 0008: Wir reichen einen FakeLlmBewerter statt den
|
|
# AsyncOpenAI-Client zu monkeypatchen. Der Fake captured den
|
|
# user_prompt aus dem ``LlmRequest`` und liefert ein minimales
|
|
# gültiges Assessment-Dict zurück.
|
|
import app.analyzer as analyzer_mod
|
|
from app.bundeslaender import BUNDESLAENDER
|
|
|
|
captured_prompts: list[str] = []
|
|
|
|
class FakeBewerter:
|
|
async def bewerte(self, request):
|
|
captured_prompts.append(request.user_prompt)
|
|
return {
|
|
"drucksache": "18/1",
|
|
"title": "Test",
|
|
"fraktionen": ["SPD"],
|
|
"datum": "2024-01-01",
|
|
"link": None,
|
|
"gwoeScore": 5,
|
|
"gwoeBegründung": "Test",
|
|
"gwoeMatrix": [],
|
|
"gwoeSchwerpunkt": [],
|
|
"wahlprogrammScores": [],
|
|
"verbesserungen": [],
|
|
"stärken": [],
|
|
"schwächen": [],
|
|
"empfehlung": "Überarbeiten",
|
|
"empfehlungSymbol": "[!]",
|
|
"verbesserungspotenzial": "mittel",
|
|
"themen": [],
|
|
"antragZusammenfassung": "Test",
|
|
"antragKernpunkte": [],
|
|
"konfidenz": "mittel",
|
|
"shareThreads": "",
|
|
"shareTwitter": "",
|
|
"shareMastodon": "",
|
|
}
|
|
|
|
import app.embeddings as emb_mod
|
|
monkeypatch.setattr(emb_mod, "EMBEDDINGS_DB", type("P", (), {"exists": lambda self: False})())
|
|
|
|
asyncio.get_event_loop().run_until_complete(
|
|
analyzer_mod.analyze_antrag(
|
|
text="Der SPD-Antrag fordert mehr Klimaschutz in NRW.",
|
|
bundesland="NRW",
|
|
model="qwen-plus",
|
|
bewerter=FakeBewerter(),
|
|
)
|
|
)
|
|
|
|
assert captured_prompts, "user_prompt muss gebaut worden sein"
|
|
prompt = captured_prompts[0]
|
|
|
|
# AfD ist keine Regierungsfraktion in NRW — muss aber trotzdem stehen
|
|
assert "AfD" in prompt, "AfD fehlt im user_prompt (PFLICHT-FRAKTIONEN-Bug)"
|
|
# Alle NRW-Fraktionen prüfen
|
|
for fraktion in BUNDESLAENDER["NRW"].landtagsfraktionen:
|
|
assert fraktion in prompt, f"Fraktion {fraktion!r} fehlt im user_prompt"
|
|
|
|
|
|
# ===========================================================================
|
|
# Bug 5 — NRW-Titel + Regierungsfraktionen im LLM-Prompt (Commit 038ebd6)
|
|
# ===========================================================================
|
|
# get_bundesland_context() muss den Parlamentsnamen und die Regierungsfraktionen
|
|
# korrekt im Context-String ausgeben.
|
|
|
|
class TestNrwTitelRegierungsfraktionen:
|
|
def test_bundesland_context_contains_regierungsfraktionen(self):
|
|
"""get_bundesland_context gibt für NRW die aktuellen Regierungsfraktionen aus."""
|
|
from app.analyzer import get_bundesland_context
|
|
from app.bundeslaender import BUNDESLAENDER
|
|
|
|
ctx = get_bundesland_context("NRW")
|
|
|
|
for regfrak in BUNDESLAENDER["NRW"].regierungsfraktionen:
|
|
assert regfrak in ctx, (
|
|
f"Regierungsfraktion {regfrak!r} fehlt im Bundesland-Context für NRW"
|
|
)
|
|
|
|
def test_bundesland_context_contains_parliament_name(self):
|
|
"""get_bundesland_context gibt den Parlamentsnamen aus."""
|
|
from app.analyzer import get_bundesland_context
|
|
from app.bundeslaender import BUNDESLAENDER
|
|
|
|
ctx = get_bundesland_context("NRW")
|
|
parliament = BUNDESLAENDER["NRW"].parlament_name
|
|
assert parliament in ctx, (
|
|
f"Parlamentsname {parliament!r} fehlt im Context-String"
|
|
)
|
|
|
|
def test_bundesland_context_contains_landtagsfraktionen(self):
|
|
"""get_bundesland_context listet alle LT-Fraktionen auf."""
|
|
from app.analyzer import get_bundesland_context
|
|
from app.bundeslaender import BUNDESLAENDER
|
|
|
|
ctx = get_bundesland_context("NRW")
|
|
for fraktion in BUNDESLAENDER["NRW"].landtagsfraktionen:
|
|
assert fraktion in ctx, (
|
|
f"Landtagsfraktion {fraktion!r} fehlt im Bundesland-Context"
|
|
)
|
|
|
|
def test_regierungsfraktionen_label_present_in_context(self):
|
|
"""Der Context-String enthält den Label 'Regierungsfraktionen'."""
|
|
from app.analyzer import get_bundesland_context
|
|
ctx = get_bundesland_context("NRW")
|
|
assert "Regierungsfraktionen" in ctx
|