"""Semantic search for Wahlprogramme and Parteiprogramme using Qwen embeddings.""" import json import sqlite3 from pathlib import Path from typing import Optional import fitz # PyMuPDF from openai import OpenAI from .config import settings # Embedding model EMBEDDING_MODEL = "text-embedding-v3" EMBEDDING_DIMENSIONS = 1024 # Database path EMBEDDINGS_DB = settings.data_dir / "embeddings.db" # Programme definitions PROGRAMME = { # Wahlprogramme NRW 2022 "spd-nrw-2022": { "name": "SPD NRW Wahlprogramm 2022", "typ": "wahlprogramm", "partei": "SPD", "bundesland": "NRW", "pdf": "spd-nrw-2022.pdf", }, "cdu-nrw-2022": { "name": "CDU NRW Wahlprogramm 2022", "typ": "wahlprogramm", "partei": "CDU", "bundesland": "NRW", "pdf": "cdu-nrw-2022.pdf", }, "gruene-nrw-2022": { "name": "Grüne NRW Wahlprogramm 2022", "typ": "wahlprogramm", "partei": "GRÜNE", "bundesland": "NRW", "pdf": "gruene-nrw-2022.pdf", }, "fdp-nrw-2022": { "name": "FDP NRW Wahlprogramm 2022", "typ": "wahlprogramm", "partei": "FDP", "bundesland": "NRW", "pdf": "fdp-nrw-2022.pdf", }, "afd-nrw-2022": { "name": "AfD NRW Wahlprogramm 2022", "typ": "wahlprogramm", "partei": "AfD", "bundesland": "NRW", "pdf": "afd-nrw-2022.pdf", }, # Sachsen-Anhalt (LTW 2021) "cdu-lsa-2021": { "name": "CDU Sachsen-Anhalt Regierungsprogramm 2021", "typ": "wahlprogramm", "partei": "CDU", "bundesland": "LSA", "pdf": "cdu-lsa-2021.pdf", }, "spd-lsa-2021": { "name": "SPD Sachsen-Anhalt Wahlprogramm 2021", "typ": "wahlprogramm", "partei": "SPD", "bundesland": "LSA", "pdf": "spd-lsa-2021.pdf", }, "gruene-lsa-2021": { "name": "Grüne Sachsen-Anhalt Wahlprogramm 2021", "typ": "wahlprogramm", "partei": "GRÜNE", "bundesland": "LSA", "pdf": "gruene-lsa-2021.pdf", }, "fdp-lsa-2021": { "name": "FDP Sachsen-Anhalt Wahlprogramm 2021", "typ": "wahlprogramm", "partei": "FDP", "bundesland": "LSA", "pdf": "fdp-lsa-2021.pdf", }, "afd-lsa-2021": { "name": "AfD Sachsen-Anhalt Wahlprogramm 2021", "typ": "wahlprogramm", "partei": "AfD", "bundesland": "LSA", "pdf": "afd-lsa-2021.pdf", }, "linke-lsa-2021": { "name": "DIE LINKE Sachsen-Anhalt Wahlprogramm 2021", "typ": "wahlprogramm", "partei": "LINKE", "bundesland": "LSA", "pdf": "linke-lsa-2021.pdf", }, # Mecklenburg-Vorpommern (LTW 26.09.2021, WP 8) — Issue #4 "cdu-mv-2021": { "name": "CDU Mecklenburg-Vorpommern Wahlprogramm 2021", "typ": "wahlprogramm", "partei": "CDU", "bundesland": "MV", "pdf": "cdu-mv-2021.pdf", }, "spd-mv-2021": { "name": "SPD Mecklenburg-Vorpommern Regierungsprogramm 2021", "typ": "wahlprogramm", "partei": "SPD", "bundesland": "MV", "pdf": "spd-mv-2021.pdf", }, "gruene-mv-2021": { "name": "Grüne Mecklenburg-Vorpommern Wahlprogramm 2021", "typ": "wahlprogramm", "partei": "GRÜNE", "bundesland": "MV", "pdf": "gruene-mv-2021.pdf", }, "fdp-mv-2021": { "name": "FDP Mecklenburg-Vorpommern Wahlprogramm 2021", "typ": "wahlprogramm", "partei": "FDP", "bundesland": "MV", "pdf": "fdp-mv-2021.pdf", }, "afd-mv-2021": { "name": "AfD Mecklenburg-Vorpommern Landeswahlprogramm 2021", "typ": "wahlprogramm", "partei": "AfD", "bundesland": "MV", "pdf": "afd-mv-2021.pdf", }, "linke-mv-2021": { "name": "DIE LINKE Mecklenburg-Vorpommern Zukunftsprogramm 2021", "typ": "wahlprogramm", "partei": "LINKE", "bundesland": "MV", "pdf": "linke-mv-2021.pdf", }, # Berlin (AGH-Wahl 26.09.2021, Wiederholung 12.02.2023, WP 19) — # Issue #10. Programme stammen aus dem Wahlkampf 2021 — die # Wiederholungswahl 2023 nutzte dieselben Programme. "cdu-be-2023": { "name": "CDU Berlin Berlin-Plan 2021", "typ": "wahlprogramm", "partei": "CDU", "bundesland": "BE", "pdf": "cdu-be-2023.pdf", }, "spd-be-2023": { "name": "SPD Berlin Wahlprogramm AGH 2021", "typ": "wahlprogramm", "partei": "SPD", "bundesland": "BE", "pdf": "spd-be-2023.pdf", }, "gruene-be-2023": { "name": "Grüne Berlin Landeswahlprogramm 2021", "typ": "wahlprogramm", "partei": "GRÜNE", "bundesland": "BE", "pdf": "gruene-be-2023.pdf", }, "linke-be-2023": { "name": "DIE LINKE Berlin Wahlprogramm 2021", "typ": "wahlprogramm", "partei": "LINKE", "bundesland": "BE", "pdf": "linke-be-2023.pdf", }, "afd-be-2023": { "name": "AfD Berlin Wahlprogramm AGH 2021", "typ": "wahlprogramm", "partei": "AfD", "bundesland": "BE", "pdf": "afd-be-2023.pdf", }, # Grundsatzprogramme (Bund) "spd-grundsatz": { "name": "SPD Grundsatzprogramm 2007", "typ": "parteiprogramm", "partei": "SPD", "pdf": "spd-grundsatzprogramm.pdf", }, "cdu-grundsatz": { "name": "CDU Grundsatzprogramm 2007", "typ": "parteiprogramm", "partei": "CDU", "pdf": "cdu-grundsatzprogramm.pdf", }, "gruene-grundsatz": { "name": "Grüne Grundsatzprogramm 2020", "typ": "parteiprogramm", "partei": "GRÜNE", "pdf": "gruene-grundsatzprogramm.pdf", }, "fdp-grundsatz": { "name": "FDP Grundsatzprogramm 2012", "typ": "parteiprogramm", "partei": "FDP", "pdf": "fdp-grundsatzprogramm.pdf", }, } def init_embeddings_db(): """Initialize the embeddings database. Includes a forward-only migration step (Issue #5): adds the ``bundesland`` column if missing and backfills existing rows from the ``PROGRAMME`` registry. Grundsatzprogramme (federal level) keep ``bundesland = NULL``; the ``find_relevant_chunks`` query treats NULL as "matches any state". """ conn = sqlite3.connect(EMBEDDINGS_DB) conn.execute(""" CREATE TABLE IF NOT EXISTS chunks ( id INTEGER PRIMARY KEY, programm_id TEXT NOT NULL, partei TEXT NOT NULL, typ TEXT NOT NULL, seite INTEGER, text TEXT NOT NULL, embedding BLOB NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_partei ON chunks(partei)") conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_typ ON chunks(typ)") # Migration: bundesland-Spalte ergänzen, falls Tabelle aus Pre-#5-Zeit cols = {row[1] for row in conn.execute("PRAGMA table_info(chunks)").fetchall()} if "bundesland" not in cols: conn.execute("ALTER TABLE chunks ADD COLUMN bundesland TEXT") conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_bundesland ON chunks(bundesland)") # Backfill: Bundesland aus PROGRAMME-Registry für bestehende Zeilen # nachtragen. Grundsatzprogramme bleiben NULL. for prog_id, info in PROGRAMME.items(): bl = info.get("bundesland") if bl is not None: conn.execute( "UPDATE chunks SET bundesland = ? WHERE programm_id = ? AND bundesland IS NULL", (bl, prog_id), ) conn.commit() conn.close() def get_client() -> OpenAI: """Get DashScope client.""" return OpenAI( api_key=settings.dashscope_api_key, base_url=settings.dashscope_base_url, ) def create_embedding(text: str) -> list[float]: """Create embedding for text using Qwen.""" client = get_client() response = client.embeddings.create( model=EMBEDDING_MODEL, input=text, dimensions=EMBEDDING_DIMENSIONS, ) return response.data[0].embedding def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]: """Split text into overlapping chunks by words.""" words = text.split() chunks = [] i = 0 while i < len(words): chunk_words = words[i:i + chunk_size] chunk = " ".join(chunk_words) if chunk.strip(): chunks.append(chunk) i += chunk_size - overlap return chunks def extract_text_with_pages(pdf_path: Path) -> list[tuple[int, str]]: """Extract text from PDF with page numbers.""" doc = fitz.open(pdf_path) pages = [] for page_num in range(len(doc)): page = doc[page_num] text = page.get_text() if text.strip(): pages.append((page_num + 1, text)) doc.close() return pages def index_programm(programm_id: str, pdf_dir: Path) -> int: """Index a single program PDF into embeddings database.""" if programm_id not in PROGRAMME: raise ValueError(f"Unknown program: {programm_id}") info = PROGRAMME[programm_id] pdf_path = pdf_dir / info["pdf"] if not pdf_path.exists(): print(f"PDF not found: {pdf_path}") return 0 conn = sqlite3.connect(EMBEDDINGS_DB) # Remove existing chunks for this program conn.execute("DELETE FROM chunks WHERE programm_id = ?", (programm_id,)) # Extract and chunk pages = extract_text_with_pages(pdf_path) total_chunks = 0 for page_num, page_text in pages: chunks = chunk_text(page_text, chunk_size=400, overlap=50) for chunk_text_content in chunks: if len(chunk_text_content.split()) < 20: # Skip tiny chunks continue try: embedding = create_embedding(chunk_text_content) embedding_blob = json.dumps(embedding).encode() conn.execute(""" INSERT INTO chunks (programm_id, partei, typ, seite, text, embedding, bundesland) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( programm_id, info["partei"], info["typ"], page_num, chunk_text_content, embedding_blob, info.get("bundesland"), # NULL für Grundsatzprogramme )) total_chunks += 1 except Exception as e: print(f"Error embedding chunk: {e}") continue conn.commit() conn.close() print(f"Indexed {total_chunks} chunks from {programm_id}") return total_chunks def cosine_similarity(a: list[float], b: list[float]) -> float: """Calculate cosine similarity between two vectors.""" dot = sum(x * y for x, y in zip(a, b)) norm_a = sum(x * x for x in a) ** 0.5 norm_b = sum(x * x for x in b) ** 0.5 if norm_a == 0 or norm_b == 0: return 0.0 return dot / (norm_a * norm_b) def find_relevant_chunks( query: str, parteien: list[str] = None, typ: str = None, bundesland: str = None, top_k: int = 3, min_similarity: float = 0.5, ) -> list[dict]: """Find most relevant chunks for a query. Args: bundesland: Wenn gesetzt, werden nur Chunks dieses Bundeslands ODER globale Chunks (bundesland IS NULL, z.B. Grundsatzprogramme) berücksichtigt. Wenn None, kein Filter. """ query_embedding = create_embedding(query) conn = sqlite3.connect(EMBEDDINGS_DB) conn.row_factory = sqlite3.Row # Build query sql = "SELECT * FROM chunks WHERE 1=1" params = [] if parteien: placeholders = ",".join("?" * len(parteien)) sql += f" AND partei IN ({placeholders})" params.extend(parteien) if typ: sql += " AND typ = ?" params.append(typ) if bundesland: # Bundesland-spezifische ODER globale Chunks (Grundsatzprogramme). sql += " AND (bundesland = ? OR bundesland IS NULL)" params.append(bundesland) rows = conn.execute(sql, params).fetchall() conn.close() # Calculate similarities results = [] for row in rows: chunk_embedding = json.loads(row["embedding"]) similarity = cosine_similarity(query_embedding, chunk_embedding) if similarity >= min_similarity: results.append({ "programm_id": row["programm_id"], "partei": row["partei"], "typ": row["typ"], "seite": row["seite"], "text": row["text"], "similarity": similarity, }) # Sort by similarity and return top_k results.sort(key=lambda x: x["similarity"], reverse=True) return results[:top_k] def get_relevant_quotes_for_antrag( antrag_text: str, fraktionen: list[str], bundesland: str, top_k_per_partei: int = 2, ) -> dict[str, list[dict]]: """Get relevant quotes from Wahl- and Parteiprogramme for an Antrag. Args: bundesland: Pflicht. Bestimmt, welche Wahlprogramme durchsucht werden und welche Regierungsfraktionen zusätzlich zu den Antragstellern einbezogen werden. """ # Lokaler Import vermeidet Zirkularität: bundeslaender.py importiert nichts # aus diesem Modul, aber der saubere Trennstrich bleibt erhalten. from .bundeslaender import BUNDESLAENDER if bundesland not in BUNDESLAENDER: raise ValueError(f"Unbekanntes Bundesland: {bundesland}") regierungsfraktionen = BUNDESLAENDER[bundesland].regierungsfraktionen parteien_to_search = list(dict.fromkeys(fraktionen + regierungsfraktionen)) # dedupe, Reihenfolge stabil results = {} for partei in parteien_to_search: partei_upper = partei.upper() if partei != "GRÜNE" else "GRÜNE" # Wahlprogramm — bundesland-gefiltert wahl_chunks = find_relevant_chunks( antrag_text, parteien=[partei_upper], typ="wahlprogramm", bundesland=bundesland, top_k=top_k_per_partei, min_similarity=0.45, ) # Parteiprogramm (Grundsatz, federal — bundesland=NULL matched implizit) partei_chunks = find_relevant_chunks( antrag_text, parteien=[partei_upper], typ="parteiprogramm", bundesland=bundesland, top_k=top_k_per_partei, min_similarity=0.45, ) if wahl_chunks or partei_chunks: results[partei_upper] = { "wahlprogramm": wahl_chunks, "parteiprogramm": partei_chunks, } return results def format_quotes_for_prompt(quotes: dict) -> str: """Format quotes for inclusion in LLM prompt.""" if not quotes: return "" lines = ["\n## Relevante Passagen aus Wahl- und Parteiprogrammen\n"] for partei, data in quotes.items(): lines.append(f"\n### {partei}\n") if data.get("wahlprogramm"): lines.append("**Wahlprogramm:**") for chunk in data["wahlprogramm"]: text = chunk["text"][:500] + "..." if len(chunk["text"]) > 500 else chunk["text"] lines.append(f'- S. {chunk["seite"]}: "{text}"') if data.get("parteiprogramm"): lines.append("\n**Grundsatzprogramm:**") for chunk in data["parteiprogramm"]: text = chunk["text"][:500] + "..." if len(chunk["text"]) > 500 else chunk["text"] lines.append(f'- S. {chunk["seite"]}: "{text}"') return "\n".join(lines) def get_programme_info() -> list[dict]: """Get list of all indexed programmes with metadata.""" info_list = [] for prog_id, info in PROGRAMME.items(): info_list.append({ "id": prog_id, "name": info["name"], "typ": info["typ"], "partei": info["partei"], "bundesland": info.get("bundesland"), "pdf": info["pdf"], "pdf_url": f"/static/referenzen/{info['pdf']}", }) return info_list def get_indexing_status() -> dict: """Get status of indexed programmes.""" if not EMBEDDINGS_DB.exists(): return {"indexed": 0, "programmes": []} conn = sqlite3.connect(EMBEDDINGS_DB) # Count chunks per program rows = conn.execute(""" SELECT programm_id, COUNT(*) as chunks FROM chunks GROUP BY programm_id """).fetchall() conn.close() indexed = {row[0]: row[1] for row in rows} programmes = [] for prog_id, info in PROGRAMME.items(): programmes.append({ "id": prog_id, "name": info["name"], "partei": info["partei"], "chunks": indexed.get(prog_id, 0), "indexed": prog_id in indexed, }) return { "indexed": len(indexed), "total": len(PROGRAMME), "programmes": programmes, }