gwoe-antragspruefer/app/embeddings.py
Dotty Dotter d5c96e6888 refactor(embeddings): PROGRAMME-Literal entfernt, Re-Export aus programme.py
Folge-Schritt zu #222 (Commit bd591b9): Das große Datenliteral in
``embeddings.py`` (~280 Einträge, 712 Zeilen) wird durch einen
Re-Export aus ``programme.PROGRAMME`` ersetzt. Damit existieren die
Programm-Stammdaten endgültig nur noch an einer Stelle.

Schema-Brücke: Die ``chunks``-Tabelle führt ``typ`` als alte Sammel-
Bezeichnung (``wahlprogramm`` oder ``parteiprogramm``).
``programme.PROGRAMME`` differenziert ``grundsatzprogramm-bund`` vs.
``grundsatzprogramm-land``. Beim Re-Export werden beide auf
``parteiprogramm`` gemappt — alte Chunks und neue Indexierungen
tragen denselben typ-String, der Filter in
``get_relevant_quotes_for_antrag`` (typ="parteiprogramm") deckt
beide Grundsatzprogramm-Varianten weiter ab.

embeddings.py schrumpft von 1574 auf 926 Zeilen (−648). Tests
unverändert grün (1217 passed).
2026-05-09 00:46:34 +02:00

927 lines
35 KiB
Python

"""Semantic search for Wahlprogramme and Parteiprogramme using Qwen embeddings."""
import json
import logging
import re
logger = logging.getLogger(__name__)
import sqlite3
import urllib.parse
from pathlib import Path
from typing import Optional
import fitz # PyMuPDF
from openai import OpenAI
from .config import settings
# Embedding-Modell (Issue #123 Migration v3 → v4):
# WRITE = Modell für neue Embeddings (Reindex, neue Assessments, neue Queries)
# READ = Modell, nach dem find_relevant_chunks filtert
# Zwei Settings erlauben Zero-Downtime-Switch. Während der Reindex läuft, bleibt
# READ auf v3 (Prod funktioniert), WRITE produziert v4 parallel. Nach Reindex:
# READ auf v4 flippen, alte v3-Rows löschen.
EMBEDDING_MODEL = settings.embedding_model_write
EMBEDDING_MODEL_READ = settings.embedding_model_read
EMBEDDING_DIMENSIONS = settings.embedding_dimensions
# Database path
EMBEDDINGS_DB = settings.data_dir / "embeddings.db"
# Programme definitions
# Programme-Daten — Re-Export aus programme.PROGRAMME (Single Source of
# Truth seit #222). Das alte embeddings.PROGRAMME-Literal mit ~280
# Einträgen ist hier weg; statt dessen wird die programme-Registry
# beim Modul-Import einmal in das Embeddings-Schema übersetzt.
#
# Schema-Unterschied: Die ``chunks``-Tabelle führt ``typ`` als
# Sammel-Bezeichnung (``wahlprogramm`` oder ``parteiprogramm``).
# programme.PROGRAMME differenziert zusätzlich ``grundsatzprogramm-bund``
# vs. ``grundsatzprogramm-land`` — beim Re-Export werden beide auf
# ``parteiprogramm`` gemappt, damit alte Chunks (vor #222) und neue
# Indexierungen denselben typ-String tragen und der Filter
# ``typ="parteiprogramm"`` weiter beide Varianten abdeckt.
def _to_embeddings_format(prog: dict) -> dict:
info = dict(prog)
typ = info.get("typ", "")
if typ.startswith("grundsatzprogramm"):
info["typ"] = "parteiprogramm"
return info
from .programme import PROGRAMME as _PROGRAMME_SOURCE
PROGRAMME = {pid: _to_embeddings_format(p) for pid, p in _PROGRAMME_SOURCE.items()}
def init_embeddings_db():
"""Initialize the embeddings database.
Includes a forward-only migration step (Issue #5): adds the
``bundesland`` column if missing and backfills existing rows from the
``PROGRAMME`` registry. Grundsatzprogramme (federal level) keep
``bundesland = NULL``; the ``find_relevant_chunks`` query treats NULL
as "matches any state".
"""
conn = sqlite3.connect(EMBEDDINGS_DB)
conn.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY,
programm_id TEXT NOT NULL,
partei TEXT NOT NULL,
typ TEXT NOT NULL,
seite INTEGER,
text TEXT NOT NULL,
embedding BLOB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_partei ON chunks(partei)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_typ ON chunks(typ)")
# Migration: bundesland-Spalte ergänzen, falls Tabelle aus Pre-#5-Zeit
cols = {row[1] for row in conn.execute("PRAGMA table_info(chunks)").fetchall()}
if "bundesland" not in cols:
conn.execute("ALTER TABLE chunks ADD COLUMN bundesland TEXT")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_bundesland ON chunks(bundesland)")
# Migration #123: model-Spalte ergänzen. Bestehende Rows bekommen das alte
# v3-Default, neue Rows werden mit EMBEDDING_MODEL (aus config) befüllt.
if "model" not in cols:
conn.execute(
"ALTER TABLE chunks ADD COLUMN model TEXT NOT NULL DEFAULT 'text-embedding-v3'"
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_model ON chunks(model)")
# Backfill: Bundesland aus PROGRAMME-Registry für bestehende Zeilen
# nachtragen. Grundsatzprogramme bleiben NULL.
for prog_id, info in PROGRAMME.items():
bl = info.get("bundesland")
if bl is not None:
conn.execute(
"UPDATE chunks SET bundesland = ? WHERE programm_id = ? AND bundesland IS NULL",
(bl, prog_id),
)
conn.commit()
conn.close()
def get_client() -> OpenAI:
"""Get DashScope client."""
return OpenAI(
api_key=settings.dashscope_api_key,
base_url=settings.dashscope_base_url,
)
def create_embedding(text: str, model: Optional[str] = None) -> list[float]:
"""Create embedding for text using Qwen.
Args:
model: Optionaler Override. Default = EMBEDDING_MODEL (write model).
Während der Migration #123 ruft find_relevant_chunks mit
EMBEDDING_MODEL_READ auf, damit Query-Embeddings im selben
Vektorraum wie die gespeicherten Chunks liegen.
"""
client = get_client()
response = client.embeddings.create(
model=model or EMBEDDING_MODEL,
input=text,
dimensions=EMBEDDING_DIMENSIONS,
)
return response.data[0].embedding
# DashScope text-embedding-v4 erlaubt bis zu 10 Texte pro Batch-Call.
# 10 ist das harte Maximum — bei mehr gibt die API Fehler.
EMBEDDING_BATCH_SIZE = 10
def create_embeddings_batch(texts: list[str], model: Optional[str] = None) -> list[list[float]]:
"""Batch-Embedding — ein API-Call für bis zu EMBEDDING_BATCH_SIZE Texte.
Gibt die Embeddings in derselben Reihenfolge wie die Input-Liste zurück.
Rate-Limit-freundlich: statt 10 sequentielle Calls genügt einer.
"""
if not texts:
return []
if len(texts) > EMBEDDING_BATCH_SIZE:
raise ValueError(f"Batch zu groß: {len(texts)} > {EMBEDDING_BATCH_SIZE}")
client = get_client()
response = client.embeddings.create(
model=model or EMBEDDING_MODEL,
input=texts,
dimensions=EMBEDDING_DIMENSIONS,
)
# DashScope gibt die Embeddings in der Reihenfolge zurück, in der sie
# gesendet wurden (index-basiert). Wir sortieren defensiv nach index.
return [d.embedding for d in sorted(response.data, key=lambda d: d.index)]
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
"""Split text into overlapping chunks by words."""
words = text.split()
chunks = []
i = 0
while i < len(words):
chunk_words = words[i:i + chunk_size]
chunk = " ".join(chunk_words)
if chunk.strip():
chunks.append(chunk)
i += chunk_size - overlap
return chunks
def extract_text_with_pages(pdf_path: Path) -> list[tuple[int, str]]:
"""Extract text from PDF with page numbers."""
doc = fitz.open(pdf_path)
pages = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
if text.strip():
pages.append((page_num + 1, text))
doc.close()
return pages
def index_programm(programm_id: str, pdf_dir: Path) -> int:
"""Index a single program PDF into embeddings database."""
if programm_id not in PROGRAMME:
raise ValueError(f"Unknown program: {programm_id}")
info = PROGRAMME[programm_id]
pdf_path = pdf_dir / info["pdf"]
if not pdf_path.exists():
logger.warning("PDF not found: %s", pdf_path)
return 0
conn = sqlite3.connect(EMBEDDINGS_DB)
# Remove existing chunks for this program — nur für das aktuelle WRITE-
# Modell, damit parallel existierende v3-Rows während der #123-Migration
# nicht verloren gehen.
conn.execute(
"DELETE FROM chunks WHERE programm_id = ? AND model = ?",
(programm_id, EMBEDDING_MODEL),
)
# Extract and chunk
pages = extract_text_with_pages(pdf_path)
total_chunks = 0
for page_num, page_text in pages:
chunks = chunk_text(page_text, chunk_size=400, overlap=50)
for chunk_text_content in chunks:
if len(chunk_text_content.split()) < 20: # Skip tiny chunks
continue
try:
embedding = create_embedding(chunk_text_content)
embedding_blob = json.dumps(embedding).encode()
conn.execute("""
INSERT INTO chunks (programm_id, partei, typ, seite, text, embedding, bundesland, model)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
programm_id,
info["partei"],
info["typ"],
page_num,
chunk_text_content,
embedding_blob,
info.get("bundesland"), # NULL für Grundsatzprogramme
EMBEDDING_MODEL,
))
total_chunks += 1
except Exception as e:
logger.exception("Error embedding chunk")
continue
conn.commit()
conn.close()
logger.info("Indexed %d chunks from %s", total_chunks, programm_id)
return total_chunks
def create_assessment_embedding(
title: str,
zusammenfassung: Optional[str],
themen: Optional[list[str]],
bundesland: Optional[str] = None,
) -> tuple[Optional[bytes], Optional[str]]:
"""Erzeuge ein Assessment-Embedding für Clustering (#105) und Ähnlichkeit (#108).
Kombiniert Titel + Kurzfassung + Themen + Bundesland zu einem einzelnen
String und embedded ihn mit dem aktuellen WRITE-Modell. Gibt `(None, None)`
zurück wenn die Embedding-API fehlschlägt — das Backfill-Script zieht
solche Assessments später nach.
"""
parts = [title or ""]
if zusammenfassung:
parts.append(zusammenfassung)
if themen:
parts.append(", ".join(themen))
if bundesland:
parts.append(f"Bundesland: {bundesland}")
text = "\n".join(p for p in parts if p).strip()
if not text:
return None, None
try:
vec = create_embedding(text, model=EMBEDDING_MODEL)
return json.dumps(vec).encode(), EMBEDDING_MODEL
except Exception:
logger.exception("create_assessment_embedding failed")
return None, None
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Calculate cosine similarity between two vectors."""
dot = sum(x * y for x, y in zip(a, b))
norm_a = sum(x * x for x in a) ** 0.5
norm_b = sum(x * x for x in b) ** 0.5
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
def find_relevant_chunks(
query: str,
parteien: list[str] = None,
typ: str = None,
bundesland: str = None,
top_k: int = 3,
min_similarity: float = 0.5,
datum: Optional[str] = None,
) -> list[dict]:
"""Find most relevant chunks for a query.
Args:
bundesland: Wenn gesetzt, werden nur Chunks dieses Bundeslands ODER
globale Chunks (bundesland IS NULL, z.B. Grundsatzprogramme)
berücksichtigt. Wenn None, kein Filter.
datum: ISO-Datum (YYYY-MM-DD). Wenn gesetzt, werden nur Chunks
zurückgegeben, deren ``programm_id`` in einem Programm liegt,
dessen Geltungszeitraum [gueltig_ab, gueltig_bis) das Datum
enthält. Damit erfolgen historische Bewertungen gegen das
zeitpunkt-richtige Programm (ADR 0013). Wenn None: alle
Programme (gegenwärtig und vergangen) durchsuchbar — Default
für Rückwärtskompatibilität.
"""
# Query-Embedding muss im selben Vektorraum wie die gespeicherten Chunks
# liegen — während der Migration #123 ist das EMBEDDING_MODEL_READ.
query_embedding = create_embedding(query, model=EMBEDDING_MODEL_READ)
conn = sqlite3.connect(EMBEDDINGS_DB)
conn.row_factory = sqlite3.Row
# Build query — filtert auf das aktive READ-Modell, damit v3- und
# v4-Embeddings nicht gemischt werden (Cosine wäre Nonsens).
sql = "SELECT * FROM chunks WHERE model = ?"
params = [EMBEDDING_MODEL_READ]
if parteien:
placeholders = ",".join("?" * len(parteien))
sql += f" AND partei IN ({placeholders})"
params.extend(parteien)
if typ:
sql += " AND typ = ?"
params.append(typ)
if bundesland:
# Bundesland-spezifische ODER globale Chunks (Grundsatzprogramme).
sql += " AND (bundesland = ? OR bundesland IS NULL)"
params.append(bundesland)
if datum:
# Welche programm_ids gelten zu diesem Datum? Pre-compute aus PROGRAMME.
valid_pids = []
for pid, info in PROGRAMME.items():
ab = info.get("gueltig_ab")
if not ab:
continue
bis = info.get("gueltig_bis")
if datum < ab:
continue
if bis is not None and datum >= bis:
continue
valid_pids.append(pid)
if valid_pids:
placeholders = ",".join("?" * len(valid_pids))
sql += f" AND programm_id IN ({placeholders})"
params.extend(valid_pids)
else:
# Kein Programm gilt zu diesem Datum — leere Resultmenge.
conn.close()
return []
rows = conn.execute(sql, params).fetchall()
conn.close()
# Calculate similarities
results = []
for row in rows:
chunk_embedding = json.loads(row["embedding"])
similarity = cosine_similarity(query_embedding, chunk_embedding)
if similarity >= min_similarity:
results.append({
"programm_id": row["programm_id"],
"partei": row["partei"],
"typ": row["typ"],
"seite": row["seite"],
"text": row["text"],
"similarity": similarity,
})
# Sort by similarity and return top_k
results.sort(key=lambda x: x["similarity"], reverse=True)
return results[:top_k]
def get_relevant_quotes_for_antrag(
antrag_text: str,
fraktionen: list[str],
bundesland: str,
top_k_per_partei: int = 2,
datum: Optional[str] = None,
) -> dict[str, list[dict]]:
"""Get relevant quotes from Wahl- and Parteiprogramme for an Antrag.
Args:
bundesland: Pflicht. Bestimmt, welche Wahlprogramme durchsucht werden
und welche Regierungsfraktionen zusätzlich zu den Antragstellern
einbezogen werden.
datum: ISO-Datum des Antrags. Wenn gesetzt, werden nur Programme
durchsucht, deren Geltungszeitraum [gueltig_ab, gueltig_bis)
das Datum enthält — historische Anträge werden gegen das
zeitpunkt-richtige Programm bewertet (ADR 0013). Wenn None:
alle Programme dieser Partei (Default — Rückwärtskompat).
"""
# Lokaler Import vermeidet Zirkularität: bundeslaender.py importiert nichts
# aus diesem Modul, aber der saubere Trennstrich bleibt erhalten.
from .bundeslaender import BUNDESLAENDER
if bundesland not in BUNDESLAENDER:
raise ValueError(f"Unbekanntes Bundesland: {bundesland}")
regierungsfraktionen = BUNDESLAENDER[bundesland].regierungsfraktionen
parteien_to_search = list(dict.fromkeys(fraktionen + regierungsfraktionen)) # dedupe, Reihenfolge stabil
results = {}
from .parteien import normalize_partei
for partei in parteien_to_search:
# Kanonischer Lookup-Key über den zentralen Mapper (#55). Ersetzt
# den alten Hack ``partei.upper() if partei != "GRÜNE" else "GRÜNE"``,
# der nur die Schreibweisen-Drift in einer einzigen Partei
# abgefangen hat. Wenn der Mapper nichts findet, fallen wir auf
# den Originalstring zurück — die DB-Lookup-Schicht macht ohnehin
# eigene Case-insensitive-Vergleiche.
canonical = normalize_partei(partei, bundesland=bundesland)
partei_lookup = canonical or partei
# Wahlprogramm — bundesland-gefiltert + ggf. zeitpunkt-gefiltert
wahl_chunks = find_relevant_chunks(
antrag_text,
parteien=[partei_lookup],
typ="wahlprogramm",
bundesland=bundesland,
top_k=top_k_per_partei,
min_similarity=0.35,
datum=datum,
)
# Parteiprogramm (Grundsatz, federal — bundesland=NULL matched implizit).
# Hier wird ``datum`` ebenfalls weitergereicht: zum Antragszeitpunkt
# noch nicht gültige Grundsatzprogramme (z.B. cdu-grundsatz von 2024
# bei einem Antrag aus 2010) sollen nicht zitiert werden.
partei_chunks = find_relevant_chunks(
antrag_text,
parteien=[partei_lookup],
typ="parteiprogramm",
bundesland=bundesland,
top_k=top_k_per_partei,
min_similarity=0.35,
datum=datum,
)
if wahl_chunks or partei_chunks:
results[partei_lookup] = {
"wahlprogramm": wahl_chunks,
"parteiprogramm": partei_chunks,
}
return results
def _chunk_source_label(chunk: dict) -> str:
"""Build a fully-qualified source label like 'FDP MV Wahlprogramm 2021, S. 73'.
Without the programme name + Bundesland in the prompt, the LLM
halluzinates familiar sources from its training (typically NRW 2022)
even when the retrieved chunks all come from a different state.
"""
prog_id = chunk.get("programm_id", "")
info = PROGRAMME.get(prog_id, {})
name = info.get("name") or prog_id
seite = chunk.get("seite", "?")
return f"{name}, S. {seite}"
def _chunk_pdf_url(chunk: dict) -> Optional[str]:
"""Build the canonical PDF URL with page anchor for a chunk.
Wenn der Chunk einen ``text`` enthält, wird stattdessen die
Highlight-Endpoint-URL ``/api/wahlprogramm-cite?pid=…&seite=…&q=…``
emittiert (Issue #47). Der Endpoint rendert die Wahlprogramm-Seite
mit gelb markiertem Zitat und liefert ein 1-Seiten-PDF. Klick im
Report öffnet die Quelle direkt mit visuell hervorgehobener Stelle.
Fallback: ohne text → statische ``/static/referenzen/<pdf>#page=<n>``
URL (rückwärts-kompatibel für Pre-#47 Assessments).
"""
prog_id = chunk.get("programm_id", "")
info = PROGRAMME.get(prog_id)
if not info:
return None
pdf = info.get("pdf")
if not pdf:
return None
seite = chunk.get("seite")
text = (chunk.get("text") or "").strip()
if text and seite:
# Highlight-Endpoint mit URL-encoded query. Den Text auf 200 Zeichen
# abschneiden — search_for matched ohnehin nur Substring-Anker, und
# die URL bleibt bounded (sonst würden 500-Zeichen-Snippets in jeder
# Zitat-URL stehen und das HTML-Report-JSON aufblähen).
q = urllib.parse.quote_plus(text[:200])
return f"/api/wahlprogramm-cite?pid={prog_id}&seite={seite}&q={q}#page={seite}"
if seite:
return f"/static/referenzen/{pdf}#page={seite}"
return f"/static/referenzen/{pdf}"
def render_highlighted_page(programm_id: str, seite: int, query: str) -> Optional[bytes]:
"""Render a single Wahlprogramm-page with yellow highlights for a query.
Used by the ``/api/wahlprogramm-cite`` endpoint to serve a one-page
PDF where the cited snippet is visually highlighted via PyMuPDF
``add_highlight_annot``. Returns the serialized PDF bytes, or None
if the programme/page can't be resolved.
Returns a tuple ``(pdf_bytes, found_page, highlighted)`` where
``found_page`` is the 1-indexed page number and ``highlighted`` is
True if the text was found and annotated. Returns ``(None, 0, False)``
if the programme/page can't be resolved.
Args:
programm_id: Key into PROGRAMME registry — validated by caller.
seite: 1-indexed page number within the programme PDF.
query: Snippet text to search and highlight on the page. Long
queries are truncated to the first 200 characters before the
search; PyMuPDF's ``search_for`` falls over on huge needles
anyway and a short anchor is what we want for the visual hit.
"""
info = PROGRAMME.get(programm_id)
if not info:
return None, 0, False
pdf_filename = info.get("pdf")
if not pdf_filename:
return None, 0, False
referenzen = Path(__file__).parent / "static" / "referenzen"
pdf_path = referenzen / pdf_filename
if not pdf_path.exists():
return None, 0, False
needle = (query or "").strip()[:200]
try:
src = fitz.open(str(pdf_path))
except Exception:
# Manche PDFs (z.B. CDU Grundsatzprogramm 2007) lassen sich nicht
# mit PyMuPDF öffnen. Fallback: Original-PDF ohne Highlighting.
logger.exception("render_highlighted_page: kann %s nicht öffnen", pdf_path.name)
return pdf_path.read_bytes(), seite, False
try:
if seite < 1 or seite > len(src):
return None, 0, False
# Suche den Needle auf der angegebenen Seite. Falls dort nichts
# gefunden wird (Pre-#60-Assessments haben oft falsche Seiten-
# nummern), durchsuchen wir ALLE Seiten und nehmen die erste
# mit einem Treffer — so funktioniert Highlighting auch bei
# halluzinierten Seitenzahlen retroaktiv.
target_page_idx = seite - 1
rects = []
if needle:
clean = needle.replace("\u00ad", "")
# LLMs ziehen h\u00e4ufig die Seitenzahl-Header (\u201e44 Gute Bildung \u2026")
# mit ins Zitat. Wenn die ersten Tokens reine Ziffern sind,
# strippen wir sie f\u00fcr die Suche \u2014 sonst matched search_for nicht.
import re as _re
clean = _re.sub(r"^\s*\d+\s+", "", clean).strip()
words = clean.split()
anchor = " ".join(words[:5]) if len(words) >= 5 else clean
# Versuch 1: angegebene Seite, Volltext (gestrippt)
rects = src[target_page_idx].search_for(clean)
# Versuch 2: angegebene Seite, 5-Wort-Anker
if not rects:
rects = src[target_page_idx].search_for(anchor)
# Versuch 3: alle Seiten durchsuchen
if not rects:
for i in range(len(src)):
rects = src[i].search_for(anchor)
if rects:
target_page_idx = i
break
# Volles PDF mit Highlight-Annotation.
page = src[target_page_idx]
if needle and rects:
for rect in rects:
annot = page.add_highlight_annot(rect)
if annot is not None:
annot.set_colors(stroke=(1.0, 0.93, 0.0)) # gelb
annot.update()
# PDF-OpenAction setzen, damit der Reader direkt auf der richtigen
# Seite startet (statt Seite 1) — sonst sieht der User „PDF öffnet,
# aber falsche Seite". /Fit = passt-zur-Größe.
try:
page_xref = page.xref
catalog_xref = src.pdf_catalog()
src.xref_set_key(catalog_xref, "OpenAction", f"[{page_xref} 0 R /Fit]")
except Exception:
logger.exception("render_highlighted_page: OpenAction-Setzen fehlgeschlagen")
highlighted = bool(needle and rects)
try:
return src.tobytes(), target_page_idx + 1, highlighted
except (AssertionError, Exception):
# PyMuPDF kann manche PDFs nicht serialisieren (z.B. CDU 2007).
# Fallback: Original-PDF ohne Annotations zurückgeben.
logger.warning("render_highlighted_page: tobytes() failed für %s, sende Original", pdf_path.name)
return pdf_path.read_bytes(), target_page_idx + 1, False
finally:
src.close()
# ─────────────────────────────────────────────────────────────────────────────
# Citation post-processing — Issue #60 Option B
#
# Pre-#60 the LLM was free to fabricate `quelle`/`url` strings even when the
# `text` was a real snippet from a retrieved chunk. The A+C fix made the
# prompt more strict, but BB 8/673 (post-deploy) showed the LLM still
# cross-mixed: it copied text from chunk Qn but wrote the page from chunk Qm
# in the `quelle` field.
#
# The structural fix is to take quelle/url generation away from the LLM
# entirely. After the LLM responds, we walk over every Zitat and try to
# locate its `text` (substring or 5-word anchor) in any of the chunks the
# LLM was actually shown. If we find a match, we *overwrite* quelle and url
# with the canonical values from that chunk. If we don't find a match, the
# Zitat is dropped — it cannot be backed by retrieved evidence.
# ─────────────────────────────────────────────────────────────────────────────
_RE_WHITESPACE = re.compile(r"\s+")
_RE_HYPHEN_BREAK = re.compile(r"(\w)-\s+(\w)")
_RE_TRUNCATION = re.compile(r"^\s*\.{2,}|\.{2,}\s*$")
def _normalize_for_match(text: str) -> str:
"""Lowercase, collapse whitespace, bridge soft-hyphen line breaks.
Mirrors the matcher used in tests/integration/test_citations_substring.py
so that the analyzer's post-processing and Sub-D's verification stay in
lockstep.
"""
s = (text or "").lower()
s = _RE_TRUNCATION.sub("", s)
s = s.replace("\u00ad", "") # soft hyphen
s = _RE_WHITESPACE.sub(" ", s).strip()
prev = None
while prev != s:
prev = s
s = _RE_HYPHEN_BREAK.sub(r"\1\2", s)
return s
def find_chunk_for_text(text: str, chunks: list[dict]) -> Optional[dict]:
"""Locate the retrieved chunk that a Zitat snippet was copied from.
Two-stage match identical to Sub-D:
1. **Strict substring** — full needle as substring of any chunk.
2. **5-word anchor** — any 5 consecutive words of the needle as
substring of any chunk.
Snippets shorter than 20 characters are rejected (too weak to bind).
Returns the matching chunk dict, or None.
"""
needle = _normalize_for_match(text)
if len(needle) < 20:
return None
chunks_norm = [(c, _normalize_for_match(c.get("text", ""))) for c in chunks]
for c, norm in chunks_norm:
if needle in norm:
return c
words = needle.split()
if len(words) < 4:
return None
for i in range(len(words) - 3):
anchor = " ".join(words[i:i + 4])
for c, norm in chunks_norm:
if anchor in norm:
return c
return None
def reconstruct_zitate(data: dict, semantic_quotes: dict) -> dict:
"""Verify and reconstruct LLM-emitted zitate against retrieved chunks.
Matching ist strikt **partei-skopiert** — ein Zitat im AfD-Block darf
nur gegen AfD-Chunks gematcht werden, niemals gegen CDU/SPD-Chunks.
Sonst landet ein zufaellig wortgleicher Text aus einem fremden Programm
mit fremder ``quelle`` im falschen Block (Cross-Partei-Misattribution).
Match-Reihenfolge pro Zitat:
1. Partei + exakte Programm-Kategorie (z.B. AfD-Parteiprogramm-Chunks
fuer ein Zitat im AfD-Parteiprogramm-Block) → ``verified: true`` mit
kanonischer ``quelle``/``url`` aus dem Chunk.
2. Partei + andere Programm-Kategorie (z.B. AfD hat nur Grundsatz-/
Parteiprogramm im Index, der LLM hat den Text aber im Wahlprogramm-
Block emittiert) → ``verified: true`` mit korrigierter ``quelle``,
Block bleibt wie vom LLM gesetzt.
3. Kein Match in der eigenen Partei → **Zitat verwerfen**. Lieber 0
Zitate als eines mit falscher Partei-Zuschreibung. Vorher wurde
solche Zitate als ``verified: false`` mit der LLM-quelle behalten —
das fuehrte z.B. zu CDU-quellen in AfD-Bloecken (#175-bug).
"""
if not semantic_quotes:
return data
# Pool pro Partei aufbauen — Lookup geht direkt + ueber normalize_partei,
# damit Aliase ("BÜNDNIS 90/DIE GRÜNEN" ↔ "GRÜNE") beidseitig matchen.
chunks_by_party: dict[str, dict[str, list]] = {}
for partei, d in (semantic_quotes or {}).items():
chunks_by_party[partei] = {
"wahlprogramm": list(d.get("wahlprogramm", []) or []),
"parteiprogramm": list(d.get("parteiprogramm", []) or []),
}
if not chunks_by_party:
return data
try:
from .parteien import normalize_partei
except Exception:
normalize_partei = lambda x: x # noqa: E731
def _pool_for(fraktion: str) -> dict[str, list]:
# Versuch direkt, dann normalisiert. Wenn weder noch — leerer Pool.
if fraktion in chunks_by_party:
return chunks_by_party[fraktion]
norm = normalize_partei(fraktion) or fraktion
if norm in chunks_by_party:
return chunks_by_party[norm]
# Reverse-Lookup: vielleicht ist `chunks_by_party` mit normalisiertem
# Key bestueckt waehrend `fraktion` der Original-Name ist.
for key, val in chunks_by_party.items():
if normalize_partei(key) == norm:
return val
return {"wahlprogramm": [], "parteiprogramm": []}
for fs in data.get("wahlprogrammScores", []) or []:
partei_name = fs.get("fraktion", "")
partei_pool = _pool_for(partei_name)
for kind in ("wahlprogramm", "parteiprogramm"):
blk = fs.get(kind) or {}
zitate = blk.get("zitate") or []
allowed = partei_pool.get(kind) or []
cross_kind = "parteiprogramm" if kind == "wahlprogramm" else "wahlprogramm"
fallback = partei_pool.get(cross_kind) or []
cleaned = []
for z in zitate:
text = z.get("text", "") or ""
# 1. Strikter Match in (Partei, eigenes Programm)
matched = find_chunk_for_text(text, allowed) if allowed else None
if matched is None and fallback:
# 2. Fallback: gleiche Partei, andere Programm-Kategorie
matched = find_chunk_for_text(text, fallback)
if matched is None:
# 3. Kein Match in der eigenen Partei → verwerfen.
logger.warning(
"Zitat verworfen (kein Partei-Match): fraktion=%r "
"kind=%r text=%r llm_quelle=%r",
partei_name, kind, text[:80], z.get("quelle"),
)
continue
z["quelle"] = _chunk_source_label(matched)
url = _chunk_pdf_url(matched)
if url:
z["url"] = url
z["verified"] = True
cleaned.append(z)
blk["zitate"] = cleaned
return data
def format_quotes_for_prompt(
quotes: dict,
searched_parties: Optional[list[str]] = None,
) -> str:
"""Format quotes for inclusion in LLM prompt.
Each chunk gets a stable ENUM-ID ([Q1], [Q2], …) and the prompt
instructs the LLM to anchor every citation in one of those IDs and
to copy the snippet **verbatim** from the cited chunk. This is the
structural fix for Issue #60: pre-#60 the LLM was free to invent
snippets under real source labels because nothing in the prompt
bound a citation to a specific retrieved chunk.
Each quote is annotated with the fully-qualified source (programme
name + page) so the LLM cannot fall back on training-set defaults
when constructing its citations.
Issue #63 erweitert: wenn ``searched_parties`` übergeben wird, werden
Parteien, für die **kein** Chunk retrievt wurde, im Prompt explizit
als "keine Quellen im Index" markiert. Das LLM wird angewiesen, für
diese Parteien ``score: null`` zu setzen statt aus dem Trainingswissen
zu raten.
"""
if not quotes and not searched_parties:
return ""
lines = ["\n## Relevante Passagen aus Wahl- und Parteiprogrammen\n"]
lines.append(
"**ZITATEREGEL** — verbindlich für alle Zitate in `wahlprogramm`/"
"`parteiprogramm`-Blöcken:\n"
"1. Jedes Zitat MUSS auf genau einen der unten aufgelisteten "
"Chunks verweisen (Format `[Q1]`, `[Q2]`, …).\n"
"2. Der `text`-String MUSS eine **wörtliche, zusammenhängende** "
"Passage von mindestens 5 Wörtern aus genau diesem Chunk sein — "
"keine Paraphrasen, keine Zusammenfassungen, keine "
"Cross-References aus dem Gedächtnis.\n"
"3. Der `quelle`-String MUSS exakt das Source-Label des "
"gewählten Chunks sein (Programm-Name + Seitenzahl, wie unten "
"ausgeschrieben).\n"
"4. Wenn kein Chunk wirklich passt: lass das Zitat-Array leer. "
"Lieber 0 Zitate als ein erfundenes Zitat.\n"
"5. **Wenn für eine Fraktion unten KEINE QUELLEN VORHANDEN "
"steht**: setze `score: 0` für `wahlprogramm` UND "
"`parteiprogramm` dieser Fraktion und schreibe in die "
"`begründung`: 'Keine Quellen im Index — Bewertung nicht "
"möglich.' Erfinde KEINEN Score aus dem Trainingswissen.\n"
)
counter = 0
for partei, data in quotes.items():
lines.append(f"\n### {partei}\n")
if data.get("wahlprogramm"):
lines.append("**Wahlprogramm:**")
for chunk in data["wahlprogramm"]:
counter += 1
text = chunk["text"][:500] + "..." if len(chunk["text"]) > 500 else chunk["text"]
lines.append(f'- [Q{counter}] {_chunk_source_label(chunk)}: "{text}"')
if data.get("parteiprogramm"):
lines.append("\n**Grundsatzprogramm:**")
for chunk in data["parteiprogramm"]:
counter += 1
text = chunk["text"][:500] + "..." if len(chunk["text"]) > 500 else chunk["text"]
lines.append(f'- [Q{counter}] {_chunk_source_label(chunk)}: "{text}"')
# Issue #63: Parteien ohne jegliche retrievte Chunks explizit markieren,
# damit das LLM nicht aus Trainingswissen halluziniert.
if searched_parties:
parties_with_chunks = set(quotes.keys())
missing = [p for p in searched_parties if p not in parties_with_chunks]
if missing:
lines.append("\n### KEINE QUELLEN VORHANDEN\n")
lines.append(
"Für folgende Fraktionen sind weder Wahl- noch "
"Grundsatzprogramm-Passagen im Index vorhanden. "
"Bewerte sie mit `score: 0` und `zitate: []`:\n"
)
for p in missing:
lines.append(f"- **{p}**: KEINE QUELLEN — score 0, keine Zitate.")
return "\n".join(lines)
def get_programme_info() -> list[dict]:
"""Get list of all indexed programmes with metadata."""
info_list = []
for prog_id, info in PROGRAMME.items():
info_list.append({
"id": prog_id,
"name": info["name"],
"typ": info["typ"],
"partei": info["partei"],
"bundesland": info.get("bundesland"),
"pdf": info["pdf"],
"pdf_url": f"/static/referenzen/{info['pdf']}",
})
return info_list
def get_indexing_status() -> dict:
"""Get status of indexed programmes."""
if not EMBEDDINGS_DB.exists():
return {"indexed": 0, "programmes": []}
conn = sqlite3.connect(EMBEDDINGS_DB)
# Count chunks per program
rows = conn.execute("""
SELECT programm_id, COUNT(*) as chunks
FROM chunks
GROUP BY programm_id
""").fetchall()
conn.close()
indexed = {row[0]: row[1] for row in rows}
programmes = []
for prog_id, info in PROGRAMME.items():
programmes.append({
"id": prog_id,
"name": info["name"],
"partei": info["partei"],
"chunks": indexed.get(prog_id, 0),
"indexed": prog_id in indexed,
})
return {
"indexed": len(indexed),
"total": len(PROGRAMME),
"programmes": programmes,
}