Bug auf Antrag 18/18246: Bei den GRUENEN landeten unter 'Wahlprogramm' zwei Zitate — eines aus dem NRW-Wahlprogramm 2022, eines aus dem Bundes-Grundsatzprogramm 2020. Letzteres haette in den parteiprogramm-Block gehoert. Ursache in reconstruct_zitate: bei einem Cross-Kind-Fallback-Match wurde nur die quelle korrigiert, das Zitat blieb aber im urspruenglichen Block (dort wo der LLM es hingeschrieben hatte). Fix: zwei Pass-Verarbeitung. Pass 1 sammelt alle Zitate ueber beide Bloecke und klassifiziert nach tatsaechlich matchendem Pool, Pass 2 schreibt sie in die jeweils passenden Bloecke. Damit landen Wahl- und Grundsatzprogramm-Zitate konsequent in getrennten Bloecken. Test: tests/test_embeddings.py::TestReconstructZitate::test_zitat_aus_grundsatzprogramm_landet_im_parteiprogramm_block reproduziert den 18/18246-Fall.
945 lines
36 KiB
Python
945 lines
36 KiB
Python
"""Semantic search for Wahlprogramme and Parteiprogramme using Qwen embeddings."""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
import sqlite3
|
|
import urllib.parse
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import fitz # PyMuPDF
|
|
from openai import OpenAI
|
|
|
|
from .config import settings
|
|
|
|
# Embedding-Modell (Issue #123 Migration v3 → v4):
|
|
# WRITE = Modell für neue Embeddings (Reindex, neue Assessments, neue Queries)
|
|
# READ = Modell, nach dem find_relevant_chunks filtert
|
|
# Zwei Settings erlauben Zero-Downtime-Switch. Während der Reindex läuft, bleibt
|
|
# READ auf v3 (Prod funktioniert), WRITE produziert v4 parallel. Nach Reindex:
|
|
# READ auf v4 flippen, alte v3-Rows löschen.
|
|
EMBEDDING_MODEL = settings.embedding_model_write
|
|
EMBEDDING_MODEL_READ = settings.embedding_model_read
|
|
EMBEDDING_DIMENSIONS = settings.embedding_dimensions
|
|
|
|
# Database path
|
|
EMBEDDINGS_DB = settings.data_dir / "embeddings.db"
|
|
|
|
# Programme definitions
|
|
# Programme-Daten — Re-Export aus programme.PROGRAMME (Single Source of
|
|
# Truth seit #222). Das alte embeddings.PROGRAMME-Literal mit ~280
|
|
# Einträgen ist hier weg; statt dessen wird die programme-Registry
|
|
# beim Modul-Import einmal in das Embeddings-Schema übersetzt.
|
|
#
|
|
# Schema-Unterschied: Die ``chunks``-Tabelle führt ``typ`` als
|
|
# Sammel-Bezeichnung (``wahlprogramm`` oder ``parteiprogramm``).
|
|
# programme.PROGRAMME differenziert zusätzlich ``grundsatzprogramm-bund``
|
|
# vs. ``grundsatzprogramm-land`` — beim Re-Export werden beide auf
|
|
# ``parteiprogramm`` gemappt, damit alte Chunks (vor #222) und neue
|
|
# Indexierungen denselben typ-String tragen und der Filter
|
|
# ``typ="parteiprogramm"`` weiter beide Varianten abdeckt.
|
|
def _to_embeddings_format(prog: dict) -> dict:
|
|
info = dict(prog)
|
|
typ = info.get("typ", "")
|
|
if typ.startswith("grundsatzprogramm"):
|
|
info["typ"] = "parteiprogramm"
|
|
return info
|
|
|
|
|
|
from .programme import PROGRAMME as _PROGRAMME_SOURCE
|
|
PROGRAMME = {pid: _to_embeddings_format(p) for pid, p in _PROGRAMME_SOURCE.items()}
|
|
|
|
|
|
|
|
|
|
def init_embeddings_db():
|
|
"""Initialize the embeddings database.
|
|
|
|
Includes a forward-only migration step (Issue #5): adds the
|
|
``bundesland`` column if missing and backfills existing rows from the
|
|
``PROGRAMME`` registry. Grundsatzprogramme (federal level) keep
|
|
``bundesland = NULL``; the ``find_relevant_chunks`` query treats NULL
|
|
as "matches any state".
|
|
"""
|
|
conn = sqlite3.connect(EMBEDDINGS_DB)
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS chunks (
|
|
id INTEGER PRIMARY KEY,
|
|
programm_id TEXT NOT NULL,
|
|
partei TEXT NOT NULL,
|
|
typ TEXT NOT NULL,
|
|
seite INTEGER,
|
|
text TEXT NOT NULL,
|
|
embedding BLOB NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_partei ON chunks(partei)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_typ ON chunks(typ)")
|
|
|
|
# Migration: bundesland-Spalte ergänzen, falls Tabelle aus Pre-#5-Zeit
|
|
cols = {row[1] for row in conn.execute("PRAGMA table_info(chunks)").fetchall()}
|
|
if "bundesland" not in cols:
|
|
conn.execute("ALTER TABLE chunks ADD COLUMN bundesland TEXT")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_bundesland ON chunks(bundesland)")
|
|
|
|
# Migration #123: model-Spalte ergänzen. Bestehende Rows bekommen das alte
|
|
# v3-Default, neue Rows werden mit EMBEDDING_MODEL (aus config) befüllt.
|
|
if "model" not in cols:
|
|
conn.execute(
|
|
"ALTER TABLE chunks ADD COLUMN model TEXT NOT NULL DEFAULT 'text-embedding-v3'"
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_model ON chunks(model)")
|
|
|
|
# Backfill: Bundesland aus PROGRAMME-Registry für bestehende Zeilen
|
|
# nachtragen. Grundsatzprogramme bleiben NULL.
|
|
for prog_id, info in PROGRAMME.items():
|
|
bl = info.get("bundesland")
|
|
if bl is not None:
|
|
conn.execute(
|
|
"UPDATE chunks SET bundesland = ? WHERE programm_id = ? AND bundesland IS NULL",
|
|
(bl, prog_id),
|
|
)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def get_client() -> OpenAI:
|
|
"""Get DashScope client."""
|
|
return OpenAI(
|
|
api_key=settings.dashscope_api_key,
|
|
base_url=settings.dashscope_base_url,
|
|
)
|
|
|
|
|
|
def create_embedding(text: str, model: Optional[str] = None) -> list[float]:
|
|
"""Create embedding for text using Qwen.
|
|
|
|
Args:
|
|
model: Optionaler Override. Default = EMBEDDING_MODEL (write model).
|
|
Während der Migration #123 ruft find_relevant_chunks mit
|
|
EMBEDDING_MODEL_READ auf, damit Query-Embeddings im selben
|
|
Vektorraum wie die gespeicherten Chunks liegen.
|
|
"""
|
|
client = get_client()
|
|
response = client.embeddings.create(
|
|
model=model or EMBEDDING_MODEL,
|
|
input=text,
|
|
dimensions=EMBEDDING_DIMENSIONS,
|
|
)
|
|
return response.data[0].embedding
|
|
|
|
|
|
# DashScope text-embedding-v4 erlaubt bis zu 10 Texte pro Batch-Call.
|
|
# 10 ist das harte Maximum — bei mehr gibt die API Fehler.
|
|
EMBEDDING_BATCH_SIZE = 10
|
|
|
|
|
|
def create_embeddings_batch(texts: list[str], model: Optional[str] = None) -> list[list[float]]:
|
|
"""Batch-Embedding — ein API-Call für bis zu EMBEDDING_BATCH_SIZE Texte.
|
|
|
|
Gibt die Embeddings in derselben Reihenfolge wie die Input-Liste zurück.
|
|
Rate-Limit-freundlich: statt 10 sequentielle Calls genügt einer.
|
|
"""
|
|
if not texts:
|
|
return []
|
|
if len(texts) > EMBEDDING_BATCH_SIZE:
|
|
raise ValueError(f"Batch zu groß: {len(texts)} > {EMBEDDING_BATCH_SIZE}")
|
|
client = get_client()
|
|
response = client.embeddings.create(
|
|
model=model or EMBEDDING_MODEL,
|
|
input=texts,
|
|
dimensions=EMBEDDING_DIMENSIONS,
|
|
)
|
|
# DashScope gibt die Embeddings in der Reihenfolge zurück, in der sie
|
|
# gesendet wurden (index-basiert). Wir sortieren defensiv nach index.
|
|
return [d.embedding for d in sorted(response.data, key=lambda d: d.index)]
|
|
|
|
|
|
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
|
|
"""Split text into overlapping chunks by words."""
|
|
words = text.split()
|
|
chunks = []
|
|
|
|
i = 0
|
|
while i < len(words):
|
|
chunk_words = words[i:i + chunk_size]
|
|
chunk = " ".join(chunk_words)
|
|
if chunk.strip():
|
|
chunks.append(chunk)
|
|
i += chunk_size - overlap
|
|
|
|
return chunks
|
|
|
|
|
|
def extract_text_with_pages(pdf_path: Path) -> list[tuple[int, str]]:
|
|
"""Extract text from PDF with page numbers."""
|
|
doc = fitz.open(pdf_path)
|
|
pages = []
|
|
|
|
for page_num in range(len(doc)):
|
|
page = doc[page_num]
|
|
text = page.get_text()
|
|
if text.strip():
|
|
pages.append((page_num + 1, text))
|
|
|
|
doc.close()
|
|
return pages
|
|
|
|
|
|
def index_programm(programm_id: str, pdf_dir: Path) -> int:
|
|
"""Index a single program PDF into embeddings database."""
|
|
if programm_id not in PROGRAMME:
|
|
raise ValueError(f"Unknown program: {programm_id}")
|
|
|
|
info = PROGRAMME[programm_id]
|
|
pdf_path = pdf_dir / info["pdf"]
|
|
|
|
if not pdf_path.exists():
|
|
logger.warning("PDF not found: %s", pdf_path)
|
|
return 0
|
|
|
|
conn = sqlite3.connect(EMBEDDINGS_DB)
|
|
|
|
# Remove existing chunks for this program — nur für das aktuelle WRITE-
|
|
# Modell, damit parallel existierende v3-Rows während der #123-Migration
|
|
# nicht verloren gehen.
|
|
conn.execute(
|
|
"DELETE FROM chunks WHERE programm_id = ? AND model = ?",
|
|
(programm_id, EMBEDDING_MODEL),
|
|
)
|
|
|
|
# Extract and chunk
|
|
pages = extract_text_with_pages(pdf_path)
|
|
total_chunks = 0
|
|
|
|
for page_num, page_text in pages:
|
|
chunks = chunk_text(page_text, chunk_size=400, overlap=50)
|
|
|
|
for chunk_text_content in chunks:
|
|
if len(chunk_text_content.split()) < 20: # Skip tiny chunks
|
|
continue
|
|
|
|
try:
|
|
embedding = create_embedding(chunk_text_content)
|
|
embedding_blob = json.dumps(embedding).encode()
|
|
|
|
conn.execute("""
|
|
INSERT INTO chunks (programm_id, partei, typ, seite, text, embedding, bundesland, model)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
programm_id,
|
|
info["partei"],
|
|
info["typ"],
|
|
page_num,
|
|
chunk_text_content,
|
|
embedding_blob,
|
|
info.get("bundesland"), # NULL für Grundsatzprogramme
|
|
EMBEDDING_MODEL,
|
|
))
|
|
total_chunks += 1
|
|
except Exception as e:
|
|
logger.exception("Error embedding chunk")
|
|
continue
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
logger.info("Indexed %d chunks from %s", total_chunks, programm_id)
|
|
return total_chunks
|
|
|
|
|
|
def create_assessment_embedding(
|
|
title: str,
|
|
zusammenfassung: Optional[str],
|
|
themen: Optional[list[str]],
|
|
bundesland: Optional[str] = None,
|
|
) -> tuple[Optional[bytes], Optional[str]]:
|
|
"""Erzeuge ein Assessment-Embedding für Clustering (#105) und Ähnlichkeit (#108).
|
|
|
|
Kombiniert Titel + Kurzfassung + Themen + Bundesland zu einem einzelnen
|
|
String und embedded ihn mit dem aktuellen WRITE-Modell. Gibt `(None, None)`
|
|
zurück wenn die Embedding-API fehlschlägt — das Backfill-Script zieht
|
|
solche Assessments später nach.
|
|
"""
|
|
parts = [title or ""]
|
|
if zusammenfassung:
|
|
parts.append(zusammenfassung)
|
|
if themen:
|
|
parts.append(", ".join(themen))
|
|
if bundesland:
|
|
parts.append(f"Bundesland: {bundesland}")
|
|
text = "\n".join(p for p in parts if p).strip()
|
|
if not text:
|
|
return None, None
|
|
|
|
try:
|
|
vec = create_embedding(text, model=EMBEDDING_MODEL)
|
|
return json.dumps(vec).encode(), EMBEDDING_MODEL
|
|
except Exception:
|
|
logger.exception("create_assessment_embedding failed")
|
|
return None, None
|
|
|
|
|
|
def cosine_similarity(a: list[float], b: list[float]) -> float:
|
|
"""Calculate cosine similarity between two vectors."""
|
|
dot = sum(x * y for x, y in zip(a, b))
|
|
norm_a = sum(x * x for x in a) ** 0.5
|
|
norm_b = sum(x * x for x in b) ** 0.5
|
|
if norm_a == 0 or norm_b == 0:
|
|
return 0.0
|
|
return dot / (norm_a * norm_b)
|
|
|
|
|
|
def find_relevant_chunks(
|
|
query: str,
|
|
parteien: list[str] = None,
|
|
typ: str = None,
|
|
bundesland: str = None,
|
|
top_k: int = 3,
|
|
min_similarity: float = 0.5,
|
|
datum: Optional[str] = None,
|
|
) -> list[dict]:
|
|
"""Find most relevant chunks for a query.
|
|
|
|
Args:
|
|
bundesland: Wenn gesetzt, werden nur Chunks dieses Bundeslands ODER
|
|
globale Chunks (bundesland IS NULL, z.B. Grundsatzprogramme)
|
|
berücksichtigt. Wenn None, kein Filter.
|
|
datum: ISO-Datum (YYYY-MM-DD). Wenn gesetzt, werden nur Chunks
|
|
zurückgegeben, deren ``programm_id`` in einem Programm liegt,
|
|
dessen Geltungszeitraum [gueltig_ab, gueltig_bis) das Datum
|
|
enthält. Damit erfolgen historische Bewertungen gegen das
|
|
zeitpunkt-richtige Programm (ADR 0013). Wenn None: alle
|
|
Programme (gegenwärtig und vergangen) durchsuchbar — Default
|
|
für Rückwärtskompatibilität.
|
|
"""
|
|
|
|
# Query-Embedding muss im selben Vektorraum wie die gespeicherten Chunks
|
|
# liegen — während der Migration #123 ist das EMBEDDING_MODEL_READ.
|
|
query_embedding = create_embedding(query, model=EMBEDDING_MODEL_READ)
|
|
|
|
conn = sqlite3.connect(EMBEDDINGS_DB)
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
# Build query — filtert auf das aktive READ-Modell, damit v3- und
|
|
# v4-Embeddings nicht gemischt werden (Cosine wäre Nonsens).
|
|
sql = "SELECT * FROM chunks WHERE model = ?"
|
|
params = [EMBEDDING_MODEL_READ]
|
|
|
|
if parteien:
|
|
placeholders = ",".join("?" * len(parteien))
|
|
sql += f" AND partei IN ({placeholders})"
|
|
params.extend(parteien)
|
|
|
|
if typ:
|
|
sql += " AND typ = ?"
|
|
params.append(typ)
|
|
|
|
if bundesland:
|
|
# Bundesland-spezifische ODER globale Chunks (Grundsatzprogramme).
|
|
sql += " AND (bundesland = ? OR bundesland IS NULL)"
|
|
params.append(bundesland)
|
|
|
|
if datum:
|
|
# Welche programm_ids gelten zu diesem Datum? Pre-compute aus PROGRAMME.
|
|
valid_pids = []
|
|
for pid, info in PROGRAMME.items():
|
|
ab = info.get("gueltig_ab")
|
|
if not ab:
|
|
continue
|
|
bis = info.get("gueltig_bis")
|
|
if datum < ab:
|
|
continue
|
|
if bis is not None and datum >= bis:
|
|
continue
|
|
valid_pids.append(pid)
|
|
if valid_pids:
|
|
placeholders = ",".join("?" * len(valid_pids))
|
|
sql += f" AND programm_id IN ({placeholders})"
|
|
params.extend(valid_pids)
|
|
else:
|
|
# Kein Programm gilt zu diesem Datum — leere Resultmenge.
|
|
conn.close()
|
|
return []
|
|
|
|
rows = conn.execute(sql, params).fetchall()
|
|
conn.close()
|
|
|
|
# Calculate similarities
|
|
results = []
|
|
for row in rows:
|
|
chunk_embedding = json.loads(row["embedding"])
|
|
similarity = cosine_similarity(query_embedding, chunk_embedding)
|
|
|
|
if similarity >= min_similarity:
|
|
results.append({
|
|
"programm_id": row["programm_id"],
|
|
"partei": row["partei"],
|
|
"typ": row["typ"],
|
|
"seite": row["seite"],
|
|
"text": row["text"],
|
|
"similarity": similarity,
|
|
})
|
|
|
|
# Sort by similarity and return top_k
|
|
results.sort(key=lambda x: x["similarity"], reverse=True)
|
|
return results[:top_k]
|
|
|
|
|
|
def get_relevant_quotes_for_antrag(
|
|
antrag_text: str,
|
|
fraktionen: list[str],
|
|
bundesland: str,
|
|
top_k_per_partei: int = 2,
|
|
datum: Optional[str] = None,
|
|
) -> dict[str, list[dict]]:
|
|
"""Get relevant quotes from Wahl- and Parteiprogramme for an Antrag.
|
|
|
|
Args:
|
|
bundesland: Pflicht. Bestimmt, welche Wahlprogramme durchsucht werden
|
|
und welche Regierungsfraktionen zusätzlich zu den Antragstellern
|
|
einbezogen werden.
|
|
datum: ISO-Datum des Antrags. Wenn gesetzt, werden nur Programme
|
|
durchsucht, deren Geltungszeitraum [gueltig_ab, gueltig_bis)
|
|
das Datum enthält — historische Anträge werden gegen das
|
|
zeitpunkt-richtige Programm bewertet (ADR 0013). Wenn None:
|
|
alle Programme dieser Partei (Default — Rückwärtskompat).
|
|
"""
|
|
# Lokaler Import vermeidet Zirkularität: bundeslaender.py importiert nichts
|
|
# aus diesem Modul, aber der saubere Trennstrich bleibt erhalten.
|
|
from .bundeslaender import BUNDESLAENDER
|
|
|
|
if bundesland not in BUNDESLAENDER:
|
|
raise ValueError(f"Unbekanntes Bundesland: {bundesland}")
|
|
|
|
regierungsfraktionen = BUNDESLAENDER[bundesland].regierungsfraktionen
|
|
parteien_to_search = list(dict.fromkeys(fraktionen + regierungsfraktionen)) # dedupe, Reihenfolge stabil
|
|
|
|
results = {}
|
|
|
|
from .parteien import normalize_partei
|
|
|
|
for partei in parteien_to_search:
|
|
# Kanonischer Lookup-Key über den zentralen Mapper (#55). Ersetzt
|
|
# den alten Hack ``partei.upper() if partei != "GRÜNE" else "GRÜNE"``,
|
|
# der nur die Schreibweisen-Drift in einer einzigen Partei
|
|
# abgefangen hat. Wenn der Mapper nichts findet, fallen wir auf
|
|
# den Originalstring zurück — die DB-Lookup-Schicht macht ohnehin
|
|
# eigene Case-insensitive-Vergleiche.
|
|
canonical = normalize_partei(partei, bundesland=bundesland)
|
|
partei_lookup = canonical or partei
|
|
|
|
# Wahlprogramm — bundesland-gefiltert + ggf. zeitpunkt-gefiltert
|
|
wahl_chunks = find_relevant_chunks(
|
|
antrag_text,
|
|
parteien=[partei_lookup],
|
|
typ="wahlprogramm",
|
|
bundesland=bundesland,
|
|
top_k=top_k_per_partei,
|
|
min_similarity=0.35,
|
|
datum=datum,
|
|
)
|
|
|
|
# Parteiprogramm (Grundsatz, federal — bundesland=NULL matched implizit).
|
|
# Hier wird ``datum`` ebenfalls weitergereicht: zum Antragszeitpunkt
|
|
# noch nicht gültige Grundsatzprogramme (z.B. cdu-grundsatz von 2024
|
|
# bei einem Antrag aus 2010) sollen nicht zitiert werden.
|
|
partei_chunks = find_relevant_chunks(
|
|
antrag_text,
|
|
parteien=[partei_lookup],
|
|
typ="parteiprogramm",
|
|
bundesland=bundesland,
|
|
top_k=top_k_per_partei,
|
|
min_similarity=0.35,
|
|
datum=datum,
|
|
)
|
|
|
|
if wahl_chunks or partei_chunks:
|
|
results[partei_lookup] = {
|
|
"wahlprogramm": wahl_chunks,
|
|
"parteiprogramm": partei_chunks,
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
def _chunk_source_label(chunk: dict) -> str:
|
|
"""Build a fully-qualified source label like 'FDP MV Wahlprogramm 2021, S. 73'.
|
|
|
|
Without the programme name + Bundesland in the prompt, the LLM
|
|
halluzinates familiar sources from its training (typically NRW 2022)
|
|
even when the retrieved chunks all come from a different state.
|
|
"""
|
|
prog_id = chunk.get("programm_id", "")
|
|
info = PROGRAMME.get(prog_id, {})
|
|
name = info.get("name") or prog_id
|
|
seite = chunk.get("seite", "?")
|
|
return f"{name}, S. {seite}"
|
|
|
|
|
|
def _chunk_pdf_url(chunk: dict) -> Optional[str]:
|
|
"""Build the canonical PDF URL with page anchor for a chunk.
|
|
|
|
Wenn der Chunk einen ``text`` enthält, wird stattdessen die
|
|
Highlight-Endpoint-URL ``/api/wahlprogramm-cite?pid=…&seite=…&q=…``
|
|
emittiert (Issue #47). Der Endpoint rendert die Wahlprogramm-Seite
|
|
mit gelb markiertem Zitat und liefert ein 1-Seiten-PDF. Klick im
|
|
Report öffnet die Quelle direkt mit visuell hervorgehobener Stelle.
|
|
|
|
Fallback: ohne text → statische ``/static/referenzen/<pdf>#page=<n>``
|
|
URL (rückwärts-kompatibel für Pre-#47 Assessments).
|
|
"""
|
|
prog_id = chunk.get("programm_id", "")
|
|
info = PROGRAMME.get(prog_id)
|
|
if not info:
|
|
return None
|
|
pdf = info.get("pdf")
|
|
if not pdf:
|
|
return None
|
|
seite = chunk.get("seite")
|
|
text = (chunk.get("text") or "").strip()
|
|
|
|
if text and seite:
|
|
# Highlight-Endpoint mit URL-encoded query. Den Text auf 200 Zeichen
|
|
# abschneiden — search_for matched ohnehin nur Substring-Anker, und
|
|
# die URL bleibt bounded (sonst würden 500-Zeichen-Snippets in jeder
|
|
# Zitat-URL stehen und das HTML-Report-JSON aufblähen).
|
|
q = urllib.parse.quote_plus(text[:200])
|
|
return f"/api/wahlprogramm-cite?pid={prog_id}&seite={seite}&q={q}#page={seite}"
|
|
|
|
if seite:
|
|
return f"/static/referenzen/{pdf}#page={seite}"
|
|
return f"/static/referenzen/{pdf}"
|
|
|
|
|
|
def render_highlighted_page(programm_id: str, seite: int, query: str) -> Optional[bytes]:
|
|
"""Render a single Wahlprogramm-page with yellow highlights for a query.
|
|
|
|
Used by the ``/api/wahlprogramm-cite`` endpoint to serve a one-page
|
|
PDF where the cited snippet is visually highlighted via PyMuPDF
|
|
``add_highlight_annot``. Returns the serialized PDF bytes, or None
|
|
if the programme/page can't be resolved.
|
|
|
|
Returns a tuple ``(pdf_bytes, found_page, highlighted)`` where
|
|
``found_page`` is the 1-indexed page number and ``highlighted`` is
|
|
True if the text was found and annotated. Returns ``(None, 0, False)``
|
|
if the programme/page can't be resolved.
|
|
|
|
Args:
|
|
programm_id: Key into PROGRAMME registry — validated by caller.
|
|
seite: 1-indexed page number within the programme PDF.
|
|
query: Snippet text to search and highlight on the page. Long
|
|
queries are truncated to the first 200 characters before the
|
|
search; PyMuPDF's ``search_for`` falls over on huge needles
|
|
anyway and a short anchor is what we want for the visual hit.
|
|
"""
|
|
info = PROGRAMME.get(programm_id)
|
|
if not info:
|
|
return None, 0, False
|
|
pdf_filename = info.get("pdf")
|
|
if not pdf_filename:
|
|
return None, 0, False
|
|
|
|
referenzen = Path(__file__).parent / "static" / "referenzen"
|
|
pdf_path = referenzen / pdf_filename
|
|
if not pdf_path.exists():
|
|
return None, 0, False
|
|
|
|
needle = (query or "").strip()[:200]
|
|
|
|
try:
|
|
src = fitz.open(str(pdf_path))
|
|
except Exception:
|
|
# Manche PDFs (z.B. CDU Grundsatzprogramm 2007) lassen sich nicht
|
|
# mit PyMuPDF öffnen. Fallback: Original-PDF ohne Highlighting.
|
|
logger.exception("render_highlighted_page: kann %s nicht öffnen", pdf_path.name)
|
|
return pdf_path.read_bytes(), seite, False
|
|
|
|
try:
|
|
if seite < 1 or seite > len(src):
|
|
return None, 0, False
|
|
|
|
# Suche den Needle auf der angegebenen Seite. Falls dort nichts
|
|
# gefunden wird (Pre-#60-Assessments haben oft falsche Seiten-
|
|
# nummern), durchsuchen wir ALLE Seiten und nehmen die erste
|
|
# mit einem Treffer — so funktioniert Highlighting auch bei
|
|
# halluzinierten Seitenzahlen retroaktiv.
|
|
target_page_idx = seite - 1
|
|
rects = []
|
|
if needle:
|
|
clean = needle.replace("\u00ad", "")
|
|
# LLMs ziehen h\u00e4ufig die Seitenzahl-Header (\u201e44 Gute Bildung \u2026")
|
|
# mit ins Zitat. Wenn die ersten Tokens reine Ziffern sind,
|
|
# strippen wir sie f\u00fcr die Suche \u2014 sonst matched search_for nicht.
|
|
import re as _re
|
|
clean = _re.sub(r"^\s*\d+\s+", "", clean).strip()
|
|
words = clean.split()
|
|
anchor = " ".join(words[:5]) if len(words) >= 5 else clean
|
|
# Versuch 1: angegebene Seite, Volltext (gestrippt)
|
|
rects = src[target_page_idx].search_for(clean)
|
|
# Versuch 2: angegebene Seite, 5-Wort-Anker
|
|
if not rects:
|
|
rects = src[target_page_idx].search_for(anchor)
|
|
# Versuch 3: alle Seiten durchsuchen
|
|
if not rects:
|
|
for i in range(len(src)):
|
|
rects = src[i].search_for(anchor)
|
|
if rects:
|
|
target_page_idx = i
|
|
break
|
|
|
|
# Volles PDF mit Highlight-Annotation.
|
|
page = src[target_page_idx]
|
|
if needle and rects:
|
|
for rect in rects:
|
|
annot = page.add_highlight_annot(rect)
|
|
if annot is not None:
|
|
annot.set_colors(stroke=(1.0, 0.93, 0.0)) # gelb
|
|
annot.update()
|
|
|
|
# PDF-OpenAction setzen, damit der Reader direkt auf der richtigen
|
|
# Seite startet (statt Seite 1) — sonst sieht der User „PDF öffnet,
|
|
# aber falsche Seite". /Fit = passt-zur-Größe.
|
|
try:
|
|
page_xref = page.xref
|
|
catalog_xref = src.pdf_catalog()
|
|
src.xref_set_key(catalog_xref, "OpenAction", f"[{page_xref} 0 R /Fit]")
|
|
except Exception:
|
|
logger.exception("render_highlighted_page: OpenAction-Setzen fehlgeschlagen")
|
|
|
|
highlighted = bool(needle and rects)
|
|
try:
|
|
return src.tobytes(), target_page_idx + 1, highlighted
|
|
except (AssertionError, Exception):
|
|
# PyMuPDF kann manche PDFs nicht serialisieren (z.B. CDU 2007).
|
|
# Fallback: Original-PDF ohne Annotations zurückgeben.
|
|
logger.warning("render_highlighted_page: tobytes() failed für %s, sende Original", pdf_path.name)
|
|
return pdf_path.read_bytes(), target_page_idx + 1, False
|
|
finally:
|
|
src.close()
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Citation post-processing — Issue #60 Option B
|
|
#
|
|
# Pre-#60 the LLM was free to fabricate `quelle`/`url` strings even when the
|
|
# `text` was a real snippet from a retrieved chunk. The A+C fix made the
|
|
# prompt more strict, but BB 8/673 (post-deploy) showed the LLM still
|
|
# cross-mixed: it copied text from chunk Qn but wrote the page from chunk Qm
|
|
# in the `quelle` field.
|
|
#
|
|
# The structural fix is to take quelle/url generation away from the LLM
|
|
# entirely. After the LLM responds, we walk over every Zitat and try to
|
|
# locate its `text` (substring or 5-word anchor) in any of the chunks the
|
|
# LLM was actually shown. If we find a match, we *overwrite* quelle and url
|
|
# with the canonical values from that chunk. If we don't find a match, the
|
|
# Zitat is dropped — it cannot be backed by retrieved evidence.
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
_RE_WHITESPACE = re.compile(r"\s+")
|
|
_RE_HYPHEN_BREAK = re.compile(r"(\w)-\s+(\w)")
|
|
_RE_TRUNCATION = re.compile(r"^\s*\.{2,}|\.{2,}\s*$")
|
|
|
|
|
|
def _normalize_for_match(text: str) -> str:
|
|
"""Lowercase, collapse whitespace, bridge soft-hyphen line breaks.
|
|
|
|
Mirrors the matcher used in tests/integration/test_citations_substring.py
|
|
so that the analyzer's post-processing and Sub-D's verification stay in
|
|
lockstep.
|
|
"""
|
|
s = (text or "").lower()
|
|
s = _RE_TRUNCATION.sub("", s)
|
|
s = s.replace("\u00ad", "") # soft hyphen
|
|
s = _RE_WHITESPACE.sub(" ", s).strip()
|
|
prev = None
|
|
while prev != s:
|
|
prev = s
|
|
s = _RE_HYPHEN_BREAK.sub(r"\1\2", s)
|
|
return s
|
|
|
|
|
|
def find_chunk_for_text(text: str, chunks: list[dict]) -> Optional[dict]:
|
|
"""Locate the retrieved chunk that a Zitat snippet was copied from.
|
|
|
|
Two-stage match identical to Sub-D:
|
|
1. **Strict substring** — full needle as substring of any chunk.
|
|
2. **5-word anchor** — any 5 consecutive words of the needle as
|
|
substring of any chunk.
|
|
|
|
Snippets shorter than 20 characters are rejected (too weak to bind).
|
|
Returns the matching chunk dict, or None.
|
|
"""
|
|
needle = _normalize_for_match(text)
|
|
if len(needle) < 20:
|
|
return None
|
|
chunks_norm = [(c, _normalize_for_match(c.get("text", ""))) for c in chunks]
|
|
for c, norm in chunks_norm:
|
|
if needle in norm:
|
|
return c
|
|
words = needle.split()
|
|
if len(words) < 4:
|
|
return None
|
|
for i in range(len(words) - 3):
|
|
anchor = " ".join(words[i:i + 4])
|
|
for c, norm in chunks_norm:
|
|
if anchor in norm:
|
|
return c
|
|
return None
|
|
|
|
|
|
def reconstruct_zitate(data: dict, semantic_quotes: dict) -> dict:
|
|
"""Verify and reconstruct LLM-emitted zitate against retrieved chunks.
|
|
|
|
Matching ist strikt **partei-skopiert** — ein Zitat im AfD-Block darf
|
|
nur gegen AfD-Chunks gematcht werden, niemals gegen CDU/SPD-Chunks.
|
|
Sonst landet ein zufaellig wortgleicher Text aus einem fremden Programm
|
|
mit fremder ``quelle`` im falschen Block (Cross-Partei-Misattribution).
|
|
|
|
Match-Reihenfolge pro Zitat:
|
|
1. Partei + exakte Programm-Kategorie (z.B. AfD-Parteiprogramm-Chunks
|
|
fuer ein Zitat im AfD-Parteiprogramm-Block) → ``verified: true`` mit
|
|
kanonischer ``quelle``/``url`` aus dem Chunk; Zitat bleibt im Block.
|
|
2. Partei + andere Programm-Kategorie (z.B. der LLM hat einen
|
|
Grundsatzprogramm-Text in den Wahlprogramm-Block geschrieben) →
|
|
``verified: true`` mit korrigierter ``quelle``, **Zitat wandert in
|
|
den passenden Block**, sodass `wahlprogramm.zitate` nur Zitate aus
|
|
Wahlprogrammen enthaelt und `parteiprogramm.zitate` nur Zitate aus
|
|
Grundsatz-/Parteiprogrammen.
|
|
3. Kein Match in der eigenen Partei → **Zitat verwerfen**. Lieber 0
|
|
Zitate als eines mit falscher Partei-Zuschreibung. Vorher wurde
|
|
solche Zitate als ``verified: false`` mit der LLM-quelle behalten —
|
|
das fuehrte z.B. zu CDU-quellen in AfD-Bloecken (#175-bug).
|
|
"""
|
|
if not semantic_quotes:
|
|
return data
|
|
|
|
# Pool pro Partei aufbauen — Lookup geht direkt + ueber normalize_partei,
|
|
# damit Aliase ("BÜNDNIS 90/DIE GRÜNEN" ↔ "GRÜNE") beidseitig matchen.
|
|
chunks_by_party: dict[str, dict[str, list]] = {}
|
|
for partei, d in (semantic_quotes or {}).items():
|
|
chunks_by_party[partei] = {
|
|
"wahlprogramm": list(d.get("wahlprogramm", []) or []),
|
|
"parteiprogramm": list(d.get("parteiprogramm", []) or []),
|
|
}
|
|
if not chunks_by_party:
|
|
return data
|
|
|
|
try:
|
|
from .parteien import normalize_partei
|
|
except Exception:
|
|
normalize_partei = lambda x: x # noqa: E731
|
|
|
|
def _pool_for(fraktion: str) -> dict[str, list]:
|
|
# Versuch direkt, dann normalisiert. Wenn weder noch — leerer Pool.
|
|
if fraktion in chunks_by_party:
|
|
return chunks_by_party[fraktion]
|
|
norm = normalize_partei(fraktion) or fraktion
|
|
if norm in chunks_by_party:
|
|
return chunks_by_party[norm]
|
|
# Reverse-Lookup: vielleicht ist `chunks_by_party` mit normalisiertem
|
|
# Key bestueckt waehrend `fraktion` der Original-Name ist.
|
|
for key, val in chunks_by_party.items():
|
|
if normalize_partei(key) == norm:
|
|
return val
|
|
return {"wahlprogramm": [], "parteiprogramm": []}
|
|
|
|
for fs in data.get("wahlprogrammScores", []) or []:
|
|
partei_name = fs.get("fraktion", "")
|
|
partei_pool = _pool_for(partei_name)
|
|
|
|
# Zwei-Pass-Verarbeitung: erst alle Zitate sammeln und nach
|
|
# tatsaechlichem Programmtyp klassifizieren (basierend auf welchem
|
|
# Chunk-Pool sie matchen), dann erst in die Bloecke schreiben.
|
|
# Damit landet ein Zitat aus dem Grundsatzprogramm immer im
|
|
# parteiprogramm-Block, auch wenn der LLM es ins wahlprogramm
|
|
# gepackt hatte.
|
|
classified: dict[str, list] = {"wahlprogramm": [], "parteiprogramm": []}
|
|
|
|
for kind in ("wahlprogramm", "parteiprogramm"):
|
|
blk = fs.get(kind) or {}
|
|
zitate = blk.get("zitate") or []
|
|
primary = partei_pool.get(kind) or []
|
|
cross_kind = "parteiprogramm" if kind == "wahlprogramm" else "wahlprogramm"
|
|
secondary = partei_pool.get(cross_kind) or []
|
|
|
|
for z in zitate:
|
|
text = z.get("text", "") or ""
|
|
|
|
# 1. Match im Pool, der zum vom LLM gewaehlten Block passt
|
|
matched = find_chunk_for_text(text, primary) if primary else None
|
|
target_kind = kind
|
|
if matched is None and secondary:
|
|
# 2. Fallback: gleiche Partei, andere Kategorie —
|
|
# Zitat wandert in den passenden Block.
|
|
matched = find_chunk_for_text(text, secondary)
|
|
if matched is not None:
|
|
target_kind = cross_kind
|
|
|
|
if matched is None:
|
|
# 3. Kein Match in der eigenen Partei → verwerfen.
|
|
logger.warning(
|
|
"Zitat verworfen (kein Partei-Match): fraktion=%r "
|
|
"kind=%r text=%r llm_quelle=%r",
|
|
partei_name, kind, text[:80], z.get("quelle"),
|
|
)
|
|
continue
|
|
|
|
z["quelle"] = _chunk_source_label(matched)
|
|
url = _chunk_pdf_url(matched)
|
|
if url:
|
|
z["url"] = url
|
|
z["verified"] = True
|
|
classified[target_kind].append(z)
|
|
|
|
# Klassifizierte Zitate in die jeweils passenden Bloecke schreiben.
|
|
for kind in ("wahlprogramm", "parteiprogramm"):
|
|
blk = fs.get(kind) or {}
|
|
blk["zitate"] = classified[kind]
|
|
fs[kind] = blk
|
|
|
|
return data
|
|
|
|
|
|
def format_quotes_for_prompt(
|
|
quotes: dict,
|
|
searched_parties: Optional[list[str]] = None,
|
|
) -> str:
|
|
"""Format quotes for inclusion in LLM prompt.
|
|
|
|
Each chunk gets a stable ENUM-ID ([Q1], [Q2], …) and the prompt
|
|
instructs the LLM to anchor every citation in one of those IDs and
|
|
to copy the snippet **verbatim** from the cited chunk. This is the
|
|
structural fix for Issue #60: pre-#60 the LLM was free to invent
|
|
snippets under real source labels because nothing in the prompt
|
|
bound a citation to a specific retrieved chunk.
|
|
|
|
Each quote is annotated with the fully-qualified source (programme
|
|
name + page) so the LLM cannot fall back on training-set defaults
|
|
when constructing its citations.
|
|
|
|
Issue #63 erweitert: wenn ``searched_parties`` übergeben wird, werden
|
|
Parteien, für die **kein** Chunk retrievt wurde, im Prompt explizit
|
|
als "keine Quellen im Index" markiert. Das LLM wird angewiesen, für
|
|
diese Parteien ``score: null`` zu setzen statt aus dem Trainingswissen
|
|
zu raten.
|
|
"""
|
|
if not quotes and not searched_parties:
|
|
return ""
|
|
|
|
lines = ["\n## Relevante Passagen aus Wahl- und Parteiprogrammen\n"]
|
|
lines.append(
|
|
"**ZITATEREGEL** — verbindlich für alle Zitate in `wahlprogramm`/"
|
|
"`parteiprogramm`-Blöcken:\n"
|
|
"1. Jedes Zitat MUSS auf genau einen der unten aufgelisteten "
|
|
"Chunks verweisen (Format `[Q1]`, `[Q2]`, …).\n"
|
|
"2. Der `text`-String MUSS eine **wörtliche, zusammenhängende** "
|
|
"Passage von mindestens 5 Wörtern aus genau diesem Chunk sein — "
|
|
"keine Paraphrasen, keine Zusammenfassungen, keine "
|
|
"Cross-References aus dem Gedächtnis.\n"
|
|
"3. Der `quelle`-String MUSS exakt das Source-Label des "
|
|
"gewählten Chunks sein (Programm-Name + Seitenzahl, wie unten "
|
|
"ausgeschrieben).\n"
|
|
"4. Wenn kein Chunk wirklich passt: lass das Zitat-Array leer. "
|
|
"Lieber 0 Zitate als ein erfundenes Zitat.\n"
|
|
"5. **Wenn für eine Fraktion unten KEINE QUELLEN VORHANDEN "
|
|
"steht**: setze `score: 0` für `wahlprogramm` UND "
|
|
"`parteiprogramm` dieser Fraktion und schreibe in die "
|
|
"`begründung`: 'Keine Quellen im Index — Bewertung nicht "
|
|
"möglich.' Erfinde KEINEN Score aus dem Trainingswissen.\n"
|
|
)
|
|
|
|
counter = 0
|
|
for partei, data in quotes.items():
|
|
lines.append(f"\n### {partei}\n")
|
|
|
|
if data.get("wahlprogramm"):
|
|
lines.append("**Wahlprogramm:**")
|
|
for chunk in data["wahlprogramm"]:
|
|
counter += 1
|
|
text = chunk["text"][:500] + "..." if len(chunk["text"]) > 500 else chunk["text"]
|
|
lines.append(f'- [Q{counter}] {_chunk_source_label(chunk)}: "{text}"')
|
|
|
|
if data.get("parteiprogramm"):
|
|
lines.append("\n**Grundsatzprogramm:**")
|
|
for chunk in data["parteiprogramm"]:
|
|
counter += 1
|
|
text = chunk["text"][:500] + "..." if len(chunk["text"]) > 500 else chunk["text"]
|
|
lines.append(f'- [Q{counter}] {_chunk_source_label(chunk)}: "{text}"')
|
|
|
|
# Issue #63: Parteien ohne jegliche retrievte Chunks explizit markieren,
|
|
# damit das LLM nicht aus Trainingswissen halluziniert.
|
|
if searched_parties:
|
|
parties_with_chunks = set(quotes.keys())
|
|
missing = [p for p in searched_parties if p not in parties_with_chunks]
|
|
if missing:
|
|
lines.append("\n### KEINE QUELLEN VORHANDEN\n")
|
|
lines.append(
|
|
"Für folgende Fraktionen sind weder Wahl- noch "
|
|
"Grundsatzprogramm-Passagen im Index vorhanden. "
|
|
"Bewerte sie mit `score: 0` und `zitate: []`:\n"
|
|
)
|
|
for p in missing:
|
|
lines.append(f"- **{p}**: KEINE QUELLEN — score 0, keine Zitate.")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def get_programme_info() -> list[dict]:
|
|
"""Get list of all indexed programmes with metadata."""
|
|
info_list = []
|
|
|
|
for prog_id, info in PROGRAMME.items():
|
|
info_list.append({
|
|
"id": prog_id,
|
|
"name": info["name"],
|
|
"typ": info["typ"],
|
|
"partei": info["partei"],
|
|
"bundesland": info.get("bundesland"),
|
|
"pdf": info["pdf"],
|
|
"pdf_url": f"/static/referenzen/{info['pdf']}",
|
|
})
|
|
|
|
return info_list
|
|
|
|
|
|
def get_indexing_status() -> dict:
|
|
"""Get status of indexed programmes."""
|
|
if not EMBEDDINGS_DB.exists():
|
|
return {"indexed": 0, "programmes": []}
|
|
|
|
conn = sqlite3.connect(EMBEDDINGS_DB)
|
|
|
|
# Count chunks per program
|
|
rows = conn.execute("""
|
|
SELECT programm_id, COUNT(*) as chunks
|
|
FROM chunks
|
|
GROUP BY programm_id
|
|
""").fetchall()
|
|
|
|
conn.close()
|
|
|
|
indexed = {row[0]: row[1] for row in rows}
|
|
|
|
programmes = []
|
|
for prog_id, info in PROGRAMME.items():
|
|
programmes.append({
|
|
"id": prog_id,
|
|
"name": info["name"],
|
|
"partei": info["partei"],
|
|
"chunks": indexed.get(prog_id, 0),
|
|
"indexed": prog_id in indexed,
|
|
})
|
|
|
|
return {
|
|
"indexed": len(indexed),
|
|
"total": len(PROGRAMME),
|
|
"programmes": programmes,
|
|
}
|