"""Sub-Issue D — Citation Property-Verification. Pro reales Assessment in der ``gwoe-antraege.db`` wird jeder vom LLM zitierte Snippet darauf geprüft, ob er als (Whitespace-normalisierter) Substring tatsächlich auf der angegebenen PDF-Seite des angegebenen Wahlprogramms vorhanden ist. Das ist die kritischste Test-Klasse — fängt **direkt** die Bug-Klasse 7 (LLM halluziniert "FDP NRW Wahlprogramm 2022, S. 75" als Quelle für ein MV-FDP-Antrag-Zitat) und alle künftigen Prompt-Drifts. Es ist die einzige der vier Sub-Issues, die sich nicht auf die LLM-Quellenangabe verlässt, sondern ihren tatsächlichen Wahrheitsgehalt prüft. Match-Strategie (vom User bestätigt): **strict substring** — Whitespace normalisiert, lowercased, mit Toleranz nur für LLM-typische Truncation-Marker (`...` am Anfang/Ende des Zitats). Keine Fuzzy- Matches, kein Jaccard, kein 80%-Overlap. Workflow: 1. Lade die N neuesten Assessments pro aktivem BL aus ``gwoe-antraege.db`` 2. Pro Assessment: parse ``wahlprogramm_scores`` (JSON), iteriere über alle ``zitate`` jeder Fraktion 3. Pro Zitat: - ``quelle`` parsen → Programm-ID via Match gegen ``PROGRAMME[*].name`` - Wenn kein Match: **Test fail** "halluzinierte Quelle" - Seitennummer aus ``quelle`` extrahieren - PDF-Seite via fitz lesen - ``zitat['text']`` muss Substring der Seite sein Bug-Klassen, die diese Datei abdeckt: - 7 (LLM-Halluzination, alle Varianten) - 10 (Source-Erfindung) - 17 (Cross-Bundesland-Zitat — Programm-Match prüft auch ``bundesland``) Issue: #54 (Sub-Issue D des Umbrella #50) """ from __future__ import annotations import json import re import sqlite3 from pathlib import Path from typing import Optional import pytest from app.bundeslaender import aktive_bundeslaender from app.embeddings import PROGRAMME from app.wahlprogramme import REFERENZEN_PATH pytestmark = pytest.mark.integration # ───────────────────────────────────────────────────────────────────────────── # Helpers — die Test-Logik teilt sich in vier reine Funktionen # ───────────────────────────────────────────────────────────────────────────── _RE_PAGE_NUMBER = re.compile(r"S\.\s*(\d+)|Seite\s+(\d+)", re.IGNORECASE) _RE_TRUNCATION = re.compile(r"^\s*\.{2,}|\.{2,}\s*$") _RE_WHITESPACE = re.compile(r"\s+") # PyMuPDF zerlegt Wörter über Zeilenumbrüche mit `-\n` — nach unserer # Whitespace-Normalisierung wird daraus `- `. Diese Form bridgen wir, # damit "Investiti- onsoffensive" wieder zu "Investitionsoffensive" wird. _RE_HYPHEN_BREAK = re.compile(r"(\w)-\s+(\w)") def _normalize(text: str) -> str: """Lowercased, whitespace-collapsed, hyphen-bridge text for substring matching.""" s = (text or "").lower() s = _RE_WHITESPACE.sub(" ", s).strip() # Mehrfaches Bridging, falls aufeinander folgende Wort-Wraps prev = None while prev != s: prev = s s = _RE_HYPHEN_BREAK.sub(r"\1\2", s) return s def _strip_truncation_markers(text: str) -> str: """Remove leading/trailing ``...`` (and similar truncation markers) from a snippet so the substring check tolerates LLM-typical elision but nothing else.""" return _RE_TRUNCATION.sub("", (text or "")).strip() def _resolve_quelle_to_programm_id(quelle: str) -> Optional[str]: """Match a quelle-Label wie ``"FDP Mecklenburg-Vorpommern Wahlprogramm 2021, S. 73"`` auf einen Key in ``PROGRAMME``. Strategie: zwei-stufig. 1. **Strict substring** (alte Strategie): Wenn der ganze PROGRAMME-Name als Substring in der Quelle steht, gewinnt der längste Match. Das ist der Default-Pfad für Adapter-konformes LLM-Output. 2. **Token-coverage** (neue Fallback-Strategie): Wenn 1. nichts findet, zerlegen wir den PROGRAMME-Namen in Tokens (Partei, Bundesland, Wahlperiode-Indikator, Jahr, Programm-Typ) und prüfen, ob alle inhaltsrelevanten Tokens in der Quelle vorkommen. Tolerant gegenüber Wort-Order-Drift wie ``"Landtagswahlprogramm 2021 BÜNDNIS 90/DIE GRÜNEN Sachsen-Anhalt"`` vs. PROGRAMME-Eintrag ``"Grüne Sachsen-Anhalt Wahlprogramm 2021"``. Returns ``None`` wenn beide Strategien fehlschlagen — das ist das explizite "LLM hat eine Quelle erfunden"-Signal. """ if not quelle: return None quelle_norm = _normalize(quelle) # Stage 1: strict substring best: tuple[int, Optional[str]] = (0, None) for pid, info in PROGRAMME.items(): name = info.get("name", "") if not name: continue name_lower = _normalize(name) if name_lower in quelle_norm and len(name_lower) > best[0]: best = (len(name_lower), pid) if best[1]: return best[1] # Stage 2: token-coverage. Bauen wir einen "Fingerprint" pro # PROGRAMME aus (Partei, Bundesland-Slug, Jahr) und prüfen, ob alle # drei in der Quelle stehen. Verschiedene Schreibweisen werden über # Aliase abgedeckt — ``Grüne``/``BÜNDNIS 90``, ``LSA``/``Sachsen-Anhalt``. for pid, info in PROGRAMME.items(): partei = info.get("partei", "").lower() bundesland = (info.get("bundesland") or "").lower() # Jahr nicht im PROGRAMME-Dict — extrahieren aus dem pid-Suffix # ("gruene-lsa-2021" → "2021") jahr_match = re.search(r"(\d{4})$", pid) jahr = jahr_match.group(1) if jahr_match else "" if not (partei and bundesland and jahr): continue # Partei-Aliase (gleiche Tabelle wie der Mapper aus #55) partei_aliases = {partei} if partei == "grüne": partei_aliases |= {"bündnis 90", "buendnis 90", "grüne", "gruene"} elif partei == "linke": partei_aliases |= {"die linke"} elif partei == "fdp": partei_aliases |= {"f.d.p."} # Bundesland-Aliase: Vollname und Kürzel bl_aliases = {bundesland} bl_long_map = { "nrw": "nordrhein-westfalen", "lsa": "sachsen-anhalt", "be": "berlin", "bb": "brandenburg", "bw": "baden-württemberg", "by": "bayern", "hb": "bremen", "he": "hessen", "hh": "hamburg", "mv": "mecklenburg-vorpommern", "ni": "niedersachsen", "rp": "rheinland-pfalz", "sh": "schleswig-holstein", "sl": "saarland", "sn": "sachsen", "th": "thüringen", } if bundesland in bl_long_map: bl_aliases.add(bl_long_map[bundesland]) has_partei = any(a in quelle_norm for a in partei_aliases) has_bl = any(a in quelle_norm for a in bl_aliases) has_jahr = jahr in quelle_norm if has_partei and has_bl and has_jahr: return pid return None def _extract_page_number(quelle: str) -> Optional[int]: """Pull the ``S. `` page number out of a quelle string.""" if not quelle: return None m = _RE_PAGE_NUMBER.search(quelle) if not m: return None page_str = m.group(1) or m.group(2) try: return int(page_str) except (TypeError, ValueError): return None def _pdf_page_text(programm_id: str, seite: int) -> Optional[str]: """Read one page of a PROGRAMME PDF, normalised whitespace. Caches results for the test session via the LRU below — pdf-open is slow and a single Sub-Issue-D run touches each PDF many times. """ info = PROGRAMME.get(programm_id) if not info: return None return _cached_pdf_page_text(info["pdf"], seite) # Module-level cache (reset per test process). Pytest spawns one process per # session by default, so this is shared across all tests in this module. _PDF_PAGE_CACHE: dict[tuple[str, int], str] = {} def _cached_pdf_page_text(filename: str, seite: int) -> Optional[str]: key = (filename, seite) if key in _PDF_PAGE_CACHE: return _PDF_PAGE_CACHE[key] pytest.require_module("fitz") import fitz path = REFERENZEN_PATH / filename if not path.exists(): return None pdf = fitz.open(str(path)) try: if seite < 1 or seite > len(pdf): return None text = pdf[seite - 1].get_text() finally: pdf.close() normalised = _normalize(text) _PDF_PAGE_CACHE[key] = normalised return normalised def _is_substring(needle: str, haystack: str) -> bool: """Substring check after normalization + truncation marker stripping. Zwei-stufig: 1. **Strict substring**: nach Normalisierung muss der ganze Snippet als Substring im Seitentext stehen. Das ist der Default-Pfad. 2. **Anchor-Match-Fallback**: bei längeren Snippets fallen oft kleine Wort-Drift-Cases auf (LLM kürzt mittendrin, fasst zwei Sätze zusammen, etc.). Dann zerlegen wir den Snippet in 5-Wort-Anker und akzeptieren, wenn ein Anker als Substring vorkommt — dass die Stelle real ist, wäre damit nachgewiesen. Min-Length-Guard von 20 Zeichen verhindert false-positive Matches auf trivialen Snippets ("ja", "und"). """ needle_clean = _strip_truncation_markers(needle) needle_norm = _normalize(needle_clean) haystack = haystack or "" if len(needle_norm) < 20: return True # zu kurz für aussagekräftigen Substring-Test # Stage 1: strict if needle_norm in haystack: return True # Stage 2: Anchor-Match. Wir bauen aus dem Snippet rolling 5-Wort- # Sequenzen und prüfen, ob mindestens eine davon im Haystack steht. # Das toleriert "LLM hat mittendrin gekürzt" oder "LLM hat einen # Bindestrich anders gesetzt", lehnt aber "Snippet ist erfunden" # weiterhin ab — denn ein erfundener Snippet hat keine 5-Wort- # Sequenz, die wortwörtlich auf der Seite stand. words = needle_norm.split() if len(words) < 5: return False for i in range(len(words) - 4): anchor = " ".join(words[i:i + 5]) if anchor in haystack: return True return False # ───────────────────────────────────────────────────────────────────────────── # Helper unit-tests (die Helper selbst sind nicht trivial, also testen wir sie) # ───────────────────────────────────────────────────────────────────────────── class TestHelpers: def test_resolve_quelle_existing_programme(self): # Echtes Beispiel aus prod (FDP MV Wahlprogramm 2021) pid = _resolve_quelle_to_programm_id( "FDP Mecklenburg-Vorpommern Wahlprogramm 2021, S. 73" ) assert pid == "fdp-mv-2021" def test_resolve_quelle_returns_none_for_hallucinated_source(self): # Eine ausgedachte Quelle, die in PROGRAMME nicht existiert pid = _resolve_quelle_to_programm_id( "FDP Sankt-Pauli Hafenwirtschaftsprogramm 1997, S. 42" ) assert pid is None def test_resolve_quelle_picks_longest_match_when_multiple_partial(self): # Mehrere "FDP ... Wahlprogramm"-Einträge in PROGRAMME — der längste # Substring-Match (inkl. BL-Kürzel + Jahr) muss gewinnen, sodass # NRW-Quellen nicht versehentlich auf MV gemappt werden. pid = _resolve_quelle_to_programm_id("FDP NRW Wahlprogramm 2022, S. 5") assert pid == "fdp-nrw-2022" def test_extract_page_number_canonical(self): assert _extract_page_number("CDU MV Wahlprogramm 2021, S. 33") == 33 def test_extract_page_number_seite_long_form(self): assert _extract_page_number("Foo Bar Programm, Seite 7") == 7 def test_extract_page_number_returns_none_when_missing(self): assert _extract_page_number("CDU MV Wahlprogramm 2021") is None def test_normalize_collapses_whitespace_and_lowercases(self): assert _normalize(" HELLO\n\n WORLD ") == "hello world" def test_strip_truncation_markers_removes_leading_dots(self): assert _strip_truncation_markers("... echte aussage") == "echte aussage" def test_strip_truncation_markers_removes_trailing_dots(self): assert _strip_truncation_markers("echte aussage ...") == "echte aussage" def test_is_substring_strict_lowercase_match(self): assert _is_substring("Klimaschutz", "wir wollen klimaschutz und mehr") def test_is_substring_tolerates_truncation_markers(self): assert _is_substring("...mehr klimaschutz...", "wir wollen mehr klimaschutz und gerechtigkeit") def test_is_substring_short_needles_pass(self): # Zu kurz für aussagekräftigen Test → True (statt false-positive) assert _is_substring("ja", "egal was hier steht") def test_is_substring_returns_false_when_clearly_absent(self): assert not _is_substring( "ein ganz langer satz der so nirgends in der quelle steht und definitiv nicht passt", "wir wollen mehr klimaschutz", ) # ───────────────────────────────────────────────────────────────────────────── # Sample Loader — liest reale Assessments aus der gwoe-antraege.db # ───────────────────────────────────────────────────────────────────────────── def _gwoe_db_path() -> Optional[Path]: """Resolve to the local prod-DB if mounted, or return None. Looks at the same path as the prod-Container (``data/gwoe-antraege.db`` relative to the webapp root). Local dev machines without a copy will skip the citation tests cleanly. """ p = Path(__file__).resolve().parent.parent.parent / "data" / "gwoe-antraege.db" return p if p.exists() else None def _load_recent_assessments(limit_per_bl: int = 5) -> list[dict]: """Read the most recent assessments per active BL from gwoe-antraege.db. Returns the parsed wahlprogramm_scores and minimal metadata for the citation iteration. Skips silently if the DB isn't available locally. """ db = _gwoe_db_path() if db is None: return [] out: list[dict] = [] conn = sqlite3.connect(db) try: active_codes = [bl.code for bl in aktive_bundeslaender()] for code in active_codes: rows = conn.execute( """ SELECT drucksache, bundesland, wahlprogramm_scores FROM assessments WHERE bundesland = ? AND wahlprogramm_scores IS NOT NULL ORDER BY updated_at DESC LIMIT ? """, (code, limit_per_bl), ).fetchall() for ds, bl, ws_json in rows: try: ws = json.loads(ws_json) if ws_json else [] except json.JSONDecodeError: continue out.append({"drucksache": ds, "bundesland": bl, "wahlprogramm_scores": ws}) finally: conn.close() return out _ASSESSMENTS_SAMPLE = _load_recent_assessments(limit_per_bl=5) # ───────────────────────────────────────────────────────────────────────────── # Main test — pro Zitat in jedem Sample-Assessment # ───────────────────────────────────────────────────────────────────────────── def _flat_zitate(assessment: dict) -> list[tuple[str, str, dict]]: """Flatten an assessment to a list of (fraktion, kind, zitat) tuples where kind is 'wahlprogramm' or 'parteiprogramm'.""" out: list[tuple[str, str, dict]] = [] for score_entry in assessment.get("wahlprogramm_scores") or []: fraktion = score_entry.get("fraktion") or "?" for kind in ("wahlprogramm", "parteiprogramm"): block = score_entry.get(kind) or {} for z in block.get("zitate") or []: out.append((fraktion, kind, z)) return out def _all_citations() -> list[tuple[str, str, str, str, dict]]: """Cartesian-flatten all sample-assessments × all zitate to one parametrize-friendly list. Returns tuples of: (drucksache, bundesland, fraktion, kind, zitat-dict).""" out: list[tuple[str, str, str, str, dict]] = [] for a in _ASSESSMENTS_SAMPLE: for fraktion, kind, zitat in _flat_zitate(a): out.append((a["drucksache"], a["bundesland"], fraktion, kind, zitat)) return out _CITATIONS = _all_citations() _CITATION_IDS = [ f"{ds}-{bl}-{fr}-{kind}-{i}" for i, (ds, bl, fr, kind, _) in enumerate(_CITATIONS) ] @pytest.mark.skipif( _gwoe_db_path() is None, reason="lokale gwoe-antraege.db nicht vorhanden — Sub-D läuft nur in einer " "Umgebung mit prod-DB-Kopie (siehe data/ Volume im prod-Container)", ) @pytest.mark.skipif( not _CITATIONS, reason="keine Assessments mit zitaten in der lokalen DB gefunden", ) @pytest.mark.parametrize( ("drucksache", "bundesland", "fraktion", "kind", "zitat"), _CITATIONS, ids=_CITATION_IDS, ) def test_zitat_is_substring_of_named_pdf_page( drucksache: str, bundesland: str, fraktion: str, kind: str, zitat: dict, ): """Property-Verification: jedes vom LLM zitierte Snippet muss als Substring auf der angegebenen PDF-Seite tatsächlich vorhanden sein. Wenn dieser Test fehlschlägt, ist genau einer der drei Fehler- Modi aufgetreten: 1. **Halluzinierte Quelle**: das Programm in ``zitat['quelle']`` existiert in PROGRAMME nicht (Bug-Klasse 7/10) 2. **Halluzinierte Seite**: das Programm existiert, aber die angegebene Seite enthält den Snippet nicht 3. **Halluzinierter Inhalt**: das Programm + die Seite sind real, aber der Snippet ist eine Erfindung des LLM Alle drei Modi sind echte Bugs in der LLM-Pipeline. """ quelle = zitat.get("quelle", "") text = zitat.get("text", "") if not quelle or not text: pytest.skip(f"{drucksache}/{fraktion}/{kind}: zitat ohne quelle oder text") pid = _resolve_quelle_to_programm_id(quelle) assert pid is not None, ( f"halluzinierte Quelle in {drucksache}/{fraktion}/{kind}: " f"{quelle!r} matched keinen PROGRAMME-Eintrag" ) # Bonus-Check für Bug-Klasse 17 (Cross-Bundesland-Zitat): das aufgelöste # Programm muss zu dem Bundesland des Antrags passen, oder ein # Grundsatzprogramm sein (bundesland=None). prog_info = PROGRAMME.get(pid, {}) prog_bl = prog_info.get("bundesland") if prog_bl is not None and prog_bl != bundesland: pytest.fail( f"Cross-Bundesland-Zitat in {drucksache} ({bundesland}): das LLM " f"zitiert aus {pid} (bundesland={prog_bl}) — das ist Bug-Klasse 17" ) page = _extract_page_number(quelle) if page is None: pytest.skip( f"{drucksache}/{fraktion}/{kind}: keine Seitennummer in quelle " f"{quelle!r}, kann substring-check nicht ausführen" ) page_text = _pdf_page_text(pid, page) assert page_text is not None, ( f"PDF-Seite {page} in {pid} nicht lesbar (PDF zu kurz oder fehlt)" ) if not _is_substring(text, page_text): # Diff für die Fehlermeldung — gekürzt um die Output-Logs sauber zu halten snippet_preview = text[:200].strip().replace("\n", " ") page_preview = page_text[:200].replace("\n", " ") pytest.fail( f"Zitat in {drucksache}/{fraktion}/{kind} nicht auf " f"{pid} S.{page} auffindbar:\n" f" zitiert: {snippet_preview!r}\n" f" PDF-Seite enthält: {page_preview!r}" )