gwoe-antragspruefer/app/embeddings.py
Dotty Dotter ee0218b5af Refactor wahlprogramme/embeddings/analyzer for multi-state (#5)
Atomic refactor of the three modules that previously hardcoded NRW
behaviour. After this commit, every analysis path consults the central
BUNDESLAENDER registry for governing fractions, parliament name, and
state metadata.

wahlprogramme.py
- WAHLPROGRAMME is now nested {bundesland: {partei: meta}}; NRW data
  hoisted unchanged under the "NRW" key.
- New WAHLPROGRAMM_KONTEXT_FILES dict maps a state to its overview
  markdown file (currently only NRW).
- find_relevant_quotes(text, fraktionen, bundesland) — bundesland is
  now a required positional. Governing fractions for the requested
  state are merged with the submitting fractions before lookup.
- Helpers get_wahlprogramm() and parteien_mit_wahlprogramm() expose
  the new shape to other modules.
- ValueError on unknown bundesland (no silent fallback).

embeddings.py
- Schema migration in init_embeddings_db: adds a `bundesland` column
  to the chunks table when missing, plus an index, and backfills
  existing rows from the PROGRAMME registry. Grundsatzprogramme
  (federal level) keep bundesland NULL by design.
- find_relevant_chunks accepts a bundesland filter that matches state
  rows OR NULL — so federal Grundsatzprogramme remain visible to every
  analysis.
- get_relevant_quotes_for_antrag(text, fraktionen, bundesland, …) —
  bundesland required, governing fractions read from BUNDESLAENDER
  instead of hardcoded ["CDU","GRÜNE"]. Order-preserving dedup
  replaces the previous set-based merge.
- index_programm now writes the bundesland column on insert.
- Dropped the hardcoded "Wahlprogramm NRW 2022" label in
  format_quotes_for_prompt — bundesland context is implicit in the
  surrounding prompt block.

analyzer.py
- get_bundesland_context reads parlament_name, regierungsfraktionen,
  landtagsfraktionen and the optional WAHLPROGRAMM_KONTEXT_FILES entry
  from the central registry. Throws ValueError on unknown OR inactive
  bundesland — kills the silent NRW fallback that previously masked
  configuration gaps.
- The Antragsteller-detection heuristic now iterates
  BUNDESLAENDER[bundesland].landtagsfraktionen instead of
  WAHLPROGRAMME.keys(), so we recognise parties for which we don't
  yet have a Wahlprogramm PDF.
- Both quote lookups (semantic + keyword fallback) now receive the
  bundesland.

Resolves issue #5. Foundation for #2 (LSA), #3 (Berlin), #4 (MV).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 18:48:11 +02:00

443 lines
13 KiB
Python

"""Semantic search for Wahlprogramme and Parteiprogramme using Qwen embeddings."""
import json
import sqlite3
from pathlib import Path
from typing import Optional
import fitz # PyMuPDF
from openai import OpenAI
from .config import settings
# Embedding model
EMBEDDING_MODEL = "text-embedding-v3"
EMBEDDING_DIMENSIONS = 1024
# Database path
EMBEDDINGS_DB = settings.data_dir / "embeddings.db"
# Programme definitions
PROGRAMME = {
# Wahlprogramme NRW 2022
"spd-nrw-2022": {
"name": "SPD NRW Wahlprogramm 2022",
"typ": "wahlprogramm",
"partei": "SPD",
"bundesland": "NRW",
"pdf": "spd-nrw-2022.pdf",
},
"cdu-nrw-2022": {
"name": "CDU NRW Wahlprogramm 2022",
"typ": "wahlprogramm",
"partei": "CDU",
"bundesland": "NRW",
"pdf": "cdu-nrw-2022.pdf",
},
"gruene-nrw-2022": {
"name": "Grüne NRW Wahlprogramm 2022",
"typ": "wahlprogramm",
"partei": "GRÜNE",
"bundesland": "NRW",
"pdf": "gruene-nrw-2022.pdf",
},
"fdp-nrw-2022": {
"name": "FDP NRW Wahlprogramm 2022",
"typ": "wahlprogramm",
"partei": "FDP",
"bundesland": "NRW",
"pdf": "fdp-nrw-2022.pdf",
},
"afd-nrw-2022": {
"name": "AfD NRW Wahlprogramm 2022",
"typ": "wahlprogramm",
"partei": "AfD",
"bundesland": "NRW",
"pdf": "afd-nrw-2022.pdf",
},
# Grundsatzprogramme (Bund)
"spd-grundsatz": {
"name": "SPD Grundsatzprogramm 2007",
"typ": "parteiprogramm",
"partei": "SPD",
"pdf": "spd-grundsatzprogramm.pdf",
},
"cdu-grundsatz": {
"name": "CDU Grundsatzprogramm 2007",
"typ": "parteiprogramm",
"partei": "CDU",
"pdf": "cdu-grundsatzprogramm.pdf",
},
"gruene-grundsatz": {
"name": "Grüne Grundsatzprogramm 2020",
"typ": "parteiprogramm",
"partei": "GRÜNE",
"pdf": "gruene-grundsatzprogramm.pdf",
},
"fdp-grundsatz": {
"name": "FDP Grundsatzprogramm 2012",
"typ": "parteiprogramm",
"partei": "FDP",
"pdf": "fdp-grundsatzprogramm.pdf",
},
}
def init_embeddings_db():
"""Initialize the embeddings database.
Includes a forward-only migration step (Issue #5): adds the
``bundesland`` column if missing and backfills existing rows from the
``PROGRAMME`` registry. Grundsatzprogramme (federal level) keep
``bundesland = NULL``; the ``find_relevant_chunks`` query treats NULL
as "matches any state".
"""
conn = sqlite3.connect(EMBEDDINGS_DB)
conn.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY,
programm_id TEXT NOT NULL,
partei TEXT NOT NULL,
typ TEXT NOT NULL,
seite INTEGER,
text TEXT NOT NULL,
embedding BLOB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_partei ON chunks(partei)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_typ ON chunks(typ)")
# Migration: bundesland-Spalte ergänzen, falls Tabelle aus Pre-#5-Zeit
cols = {row[1] for row in conn.execute("PRAGMA table_info(chunks)").fetchall()}
if "bundesland" not in cols:
conn.execute("ALTER TABLE chunks ADD COLUMN bundesland TEXT")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_bundesland ON chunks(bundesland)")
# Backfill: Bundesland aus PROGRAMME-Registry für bestehende Zeilen
# nachtragen. Grundsatzprogramme bleiben NULL.
for prog_id, info in PROGRAMME.items():
bl = info.get("bundesland")
if bl is not None:
conn.execute(
"UPDATE chunks SET bundesland = ? WHERE programm_id = ? AND bundesland IS NULL",
(bl, prog_id),
)
conn.commit()
conn.close()
def get_client() -> OpenAI:
"""Get DashScope client."""
return OpenAI(
api_key=settings.dashscope_api_key,
base_url=settings.dashscope_base_url,
)
def create_embedding(text: str) -> list[float]:
"""Create embedding for text using Qwen."""
client = get_client()
response = client.embeddings.create(
model=EMBEDDING_MODEL,
input=text,
dimensions=EMBEDDING_DIMENSIONS,
)
return response.data[0].embedding
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
"""Split text into overlapping chunks by words."""
words = text.split()
chunks = []
i = 0
while i < len(words):
chunk_words = words[i:i + chunk_size]
chunk = " ".join(chunk_words)
if chunk.strip():
chunks.append(chunk)
i += chunk_size - overlap
return chunks
def extract_text_with_pages(pdf_path: Path) -> list[tuple[int, str]]:
"""Extract text from PDF with page numbers."""
doc = fitz.open(pdf_path)
pages = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
if text.strip():
pages.append((page_num + 1, text))
doc.close()
return pages
def index_programm(programm_id: str, pdf_dir: Path) -> int:
"""Index a single program PDF into embeddings database."""
if programm_id not in PROGRAMME:
raise ValueError(f"Unknown program: {programm_id}")
info = PROGRAMME[programm_id]
pdf_path = pdf_dir / info["pdf"]
if not pdf_path.exists():
print(f"PDF not found: {pdf_path}")
return 0
conn = sqlite3.connect(EMBEDDINGS_DB)
# Remove existing chunks for this program
conn.execute("DELETE FROM chunks WHERE programm_id = ?", (programm_id,))
# Extract and chunk
pages = extract_text_with_pages(pdf_path)
total_chunks = 0
for page_num, page_text in pages:
chunks = chunk_text(page_text, chunk_size=400, overlap=50)
for chunk_text_content in chunks:
if len(chunk_text_content.split()) < 20: # Skip tiny chunks
continue
try:
embedding = create_embedding(chunk_text_content)
embedding_blob = json.dumps(embedding).encode()
conn.execute("""
INSERT INTO chunks (programm_id, partei, typ, seite, text, embedding, bundesland)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
programm_id,
info["partei"],
info["typ"],
page_num,
chunk_text_content,
embedding_blob,
info.get("bundesland"), # NULL für Grundsatzprogramme
))
total_chunks += 1
except Exception as e:
print(f"Error embedding chunk: {e}")
continue
conn.commit()
conn.close()
print(f"Indexed {total_chunks} chunks from {programm_id}")
return total_chunks
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Calculate cosine similarity between two vectors."""
dot = sum(x * y for x, y in zip(a, b))
norm_a = sum(x * x for x in a) ** 0.5
norm_b = sum(x * x for x in b) ** 0.5
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
def find_relevant_chunks(
query: str,
parteien: list[str] = None,
typ: str = None,
bundesland: str = None,
top_k: int = 3,
min_similarity: float = 0.5,
) -> list[dict]:
"""Find most relevant chunks for a query.
Args:
bundesland: Wenn gesetzt, werden nur Chunks dieses Bundeslands ODER
globale Chunks (bundesland IS NULL, z.B. Grundsatzprogramme)
berücksichtigt. Wenn None, kein Filter.
"""
query_embedding = create_embedding(query)
conn = sqlite3.connect(EMBEDDINGS_DB)
conn.row_factory = sqlite3.Row
# Build query
sql = "SELECT * FROM chunks WHERE 1=1"
params = []
if parteien:
placeholders = ",".join("?" * len(parteien))
sql += f" AND partei IN ({placeholders})"
params.extend(parteien)
if typ:
sql += " AND typ = ?"
params.append(typ)
if bundesland:
# Bundesland-spezifische ODER globale Chunks (Grundsatzprogramme).
sql += " AND (bundesland = ? OR bundesland IS NULL)"
params.append(bundesland)
rows = conn.execute(sql, params).fetchall()
conn.close()
# Calculate similarities
results = []
for row in rows:
chunk_embedding = json.loads(row["embedding"])
similarity = cosine_similarity(query_embedding, chunk_embedding)
if similarity >= min_similarity:
results.append({
"programm_id": row["programm_id"],
"partei": row["partei"],
"typ": row["typ"],
"seite": row["seite"],
"text": row["text"],
"similarity": similarity,
})
# Sort by similarity and return top_k
results.sort(key=lambda x: x["similarity"], reverse=True)
return results[:top_k]
def get_relevant_quotes_for_antrag(
antrag_text: str,
fraktionen: list[str],
bundesland: str,
top_k_per_partei: int = 2,
) -> dict[str, list[dict]]:
"""Get relevant quotes from Wahl- and Parteiprogramme for an Antrag.
Args:
bundesland: Pflicht. Bestimmt, welche Wahlprogramme durchsucht werden
und welche Regierungsfraktionen zusätzlich zu den Antragstellern
einbezogen werden.
"""
# Lokaler Import vermeidet Zirkularität: bundeslaender.py importiert nichts
# aus diesem Modul, aber der saubere Trennstrich bleibt erhalten.
from .bundeslaender import BUNDESLAENDER
if bundesland not in BUNDESLAENDER:
raise ValueError(f"Unbekanntes Bundesland: {bundesland}")
regierungsfraktionen = BUNDESLAENDER[bundesland].regierungsfraktionen
parteien_to_search = list(dict.fromkeys(fraktionen + regierungsfraktionen)) # dedupe, Reihenfolge stabil
results = {}
for partei in parteien_to_search:
partei_upper = partei.upper() if partei != "GRÜNE" else "GRÜNE"
# Wahlprogramm — bundesland-gefiltert
wahl_chunks = find_relevant_chunks(
antrag_text,
parteien=[partei_upper],
typ="wahlprogramm",
bundesland=bundesland,
top_k=top_k_per_partei,
min_similarity=0.45,
)
# Parteiprogramm (Grundsatz, federal — bundesland=NULL matched implizit)
partei_chunks = find_relevant_chunks(
antrag_text,
parteien=[partei_upper],
typ="parteiprogramm",
bundesland=bundesland,
top_k=top_k_per_partei,
min_similarity=0.45,
)
if wahl_chunks or partei_chunks:
results[partei_upper] = {
"wahlprogramm": wahl_chunks,
"parteiprogramm": partei_chunks,
}
return results
def format_quotes_for_prompt(quotes: dict) -> str:
"""Format quotes for inclusion in LLM prompt."""
if not quotes:
return ""
lines = ["\n## Relevante Passagen aus Wahl- und Parteiprogrammen\n"]
for partei, data in quotes.items():
lines.append(f"\n### {partei}\n")
if data.get("wahlprogramm"):
lines.append("**Wahlprogramm:**")
for chunk in data["wahlprogramm"]:
text = chunk["text"][:500] + "..." if len(chunk["text"]) > 500 else chunk["text"]
lines.append(f'- S. {chunk["seite"]}: "{text}"')
if data.get("parteiprogramm"):
lines.append("\n**Grundsatzprogramm:**")
for chunk in data["parteiprogramm"]:
text = chunk["text"][:500] + "..." if len(chunk["text"]) > 500 else chunk["text"]
lines.append(f'- S. {chunk["seite"]}: "{text}"')
return "\n".join(lines)
def get_programme_info() -> list[dict]:
"""Get list of all indexed programmes with metadata."""
info_list = []
for prog_id, info in PROGRAMME.items():
info_list.append({
"id": prog_id,
"name": info["name"],
"typ": info["typ"],
"partei": info["partei"],
"bundesland": info.get("bundesland"),
"pdf": info["pdf"],
"pdf_url": f"/static/referenzen/{info['pdf']}",
})
return info_list
def get_indexing_status() -> dict:
"""Get status of indexed programmes."""
if not EMBEDDINGS_DB.exists():
return {"indexed": 0, "programmes": []}
conn = sqlite3.connect(EMBEDDINGS_DB)
# Count chunks per program
rows = conn.execute("""
SELECT programm_id, COUNT(*) as chunks
FROM chunks
GROUP BY programm_id
""").fetchall()
conn.close()
indexed = {row[0]: row[1] for row in rows}
programmes = []
for prog_id, info in PROGRAMME.items():
programmes.append({
"id": prog_id,
"name": info["name"],
"partei": info["partei"],
"chunks": indexed.get(prog_id, 0),
"indexed": prog_id in indexed,
})
return {
"indexed": len(indexed),
"total": len(PROGRAMME),
"programmes": programmes,
}