gwoe-antragspruefer/app/embeddings.py
Dotty Dotter 63de3ca20d Initial commit: GWÖ-Antragsprüfer v1.0
Features:
- GWÖ-Matrix 2.0 Analyse für NRW-Landtagsanträge
- Verbesserungsvorschläge im Redline-Format (Original/Vorschlag/Begründung)
- Wahlprogramm- und Parteiprogrammtreue-Bewertung
- Landtag-Suche via OPAL-API
- Tag-Wolke mit Multi-Select Filter
- Partei-Filter mit Durchschnittswerten
- PDF-Report-Generierung
- Security Headers (CSP, X-Frame-Options, etc.)
- Persistente SQLite-DB via Docker Volumes

Tech Stack:
- FastAPI + Jinja2
- Qwen LLM via DashScope API
- SQLite + aiosqlite
- WeasyPrint für PDF
- Docker Compose mit Traefik
2026-03-28 22:30:24 +01:00

388 lines
11 KiB
Python

"""Semantic search for Wahlprogramme and Parteiprogramme using Qwen embeddings."""
import json
import sqlite3
from pathlib import Path
from typing import Optional
import fitz # PyMuPDF
from openai import OpenAI
from .config import settings
# Embedding model
EMBEDDING_MODEL = "text-embedding-v3"
EMBEDDING_DIMENSIONS = 1024
# Database path
EMBEDDINGS_DB = settings.data_dir / "embeddings.db"
# Programme definitions
PROGRAMME = {
# Wahlprogramme NRW 2022
"spd-nrw-2022": {
"name": "SPD NRW Wahlprogramm 2022",
"typ": "wahlprogramm",
"partei": "SPD",
"bundesland": "NRW",
"pdf": "spd-nrw-2022.pdf",
},
"cdu-nrw-2022": {
"name": "CDU NRW Wahlprogramm 2022",
"typ": "wahlprogramm",
"partei": "CDU",
"bundesland": "NRW",
"pdf": "cdu-nrw-2022.pdf",
},
"gruene-nrw-2022": {
"name": "Grüne NRW Wahlprogramm 2022",
"typ": "wahlprogramm",
"partei": "GRÜNE",
"bundesland": "NRW",
"pdf": "gruene-nrw-2022.pdf",
},
"fdp-nrw-2022": {
"name": "FDP NRW Wahlprogramm 2022",
"typ": "wahlprogramm",
"partei": "FDP",
"bundesland": "NRW",
"pdf": "fdp-nrw-2022.pdf",
},
"afd-nrw-2022": {
"name": "AfD NRW Wahlprogramm 2022",
"typ": "wahlprogramm",
"partei": "AfD",
"bundesland": "NRW",
"pdf": "afd-nrw-2022.pdf",
},
# Grundsatzprogramme (Bund)
"spd-grundsatz": {
"name": "SPD Grundsatzprogramm 2007",
"typ": "parteiprogramm",
"partei": "SPD",
"pdf": "spd-grundsatzprogramm.pdf",
},
"cdu-grundsatz": {
"name": "CDU Grundsatzprogramm 2007",
"typ": "parteiprogramm",
"partei": "CDU",
"pdf": "cdu-grundsatzprogramm.pdf",
},
"gruene-grundsatz": {
"name": "Grüne Grundsatzprogramm 2020",
"typ": "parteiprogramm",
"partei": "GRÜNE",
"pdf": "gruene-grundsatzprogramm.pdf",
},
"fdp-grundsatz": {
"name": "FDP Grundsatzprogramm 2012",
"typ": "parteiprogramm",
"partei": "FDP",
"pdf": "fdp-grundsatzprogramm.pdf",
},
}
def init_embeddings_db():
"""Initialize the embeddings database."""
conn = sqlite3.connect(EMBEDDINGS_DB)
conn.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY,
programm_id TEXT NOT NULL,
partei TEXT NOT NULL,
typ TEXT NOT NULL,
seite INTEGER,
text TEXT NOT NULL,
embedding BLOB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_partei ON chunks(partei)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_typ ON chunks(typ)")
conn.commit()
conn.close()
def get_client() -> OpenAI:
"""Get DashScope client."""
return OpenAI(
api_key=settings.dashscope_api_key,
base_url=settings.dashscope_base_url,
)
def create_embedding(text: str) -> list[float]:
"""Create embedding for text using Qwen."""
client = get_client()
response = client.embeddings.create(
model=EMBEDDING_MODEL,
input=text,
dimensions=EMBEDDING_DIMENSIONS,
)
return response.data[0].embedding
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
"""Split text into overlapping chunks by words."""
words = text.split()
chunks = []
i = 0
while i < len(words):
chunk_words = words[i:i + chunk_size]
chunk = " ".join(chunk_words)
if chunk.strip():
chunks.append(chunk)
i += chunk_size - overlap
return chunks
def extract_text_with_pages(pdf_path: Path) -> list[tuple[int, str]]:
"""Extract text from PDF with page numbers."""
doc = fitz.open(pdf_path)
pages = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
if text.strip():
pages.append((page_num + 1, text))
doc.close()
return pages
def index_programm(programm_id: str, pdf_dir: Path) -> int:
"""Index a single program PDF into embeddings database."""
if programm_id not in PROGRAMME:
raise ValueError(f"Unknown program: {programm_id}")
info = PROGRAMME[programm_id]
pdf_path = pdf_dir / info["pdf"]
if not pdf_path.exists():
print(f"PDF not found: {pdf_path}")
return 0
conn = sqlite3.connect(EMBEDDINGS_DB)
# Remove existing chunks for this program
conn.execute("DELETE FROM chunks WHERE programm_id = ?", (programm_id,))
# Extract and chunk
pages = extract_text_with_pages(pdf_path)
total_chunks = 0
for page_num, page_text in pages:
chunks = chunk_text(page_text, chunk_size=400, overlap=50)
for chunk_text_content in chunks:
if len(chunk_text_content.split()) < 20: # Skip tiny chunks
continue
try:
embedding = create_embedding(chunk_text_content)
embedding_blob = json.dumps(embedding).encode()
conn.execute("""
INSERT INTO chunks (programm_id, partei, typ, seite, text, embedding)
VALUES (?, ?, ?, ?, ?, ?)
""", (
programm_id,
info["partei"],
info["typ"],
page_num,
chunk_text_content,
embedding_blob,
))
total_chunks += 1
except Exception as e:
print(f"Error embedding chunk: {e}")
continue
conn.commit()
conn.close()
print(f"Indexed {total_chunks} chunks from {programm_id}")
return total_chunks
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Calculate cosine similarity between two vectors."""
dot = sum(x * y for x, y in zip(a, b))
norm_a = sum(x * x for x in a) ** 0.5
norm_b = sum(x * x for x in b) ** 0.5
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
def find_relevant_chunks(
query: str,
parteien: list[str] = None,
typ: str = None,
top_k: int = 3,
min_similarity: float = 0.5,
) -> list[dict]:
"""Find most relevant chunks for a query."""
query_embedding = create_embedding(query)
conn = sqlite3.connect(EMBEDDINGS_DB)
conn.row_factory = sqlite3.Row
# Build query
sql = "SELECT * FROM chunks WHERE 1=1"
params = []
if parteien:
placeholders = ",".join("?" * len(parteien))
sql += f" AND partei IN ({placeholders})"
params.extend(parteien)
if typ:
sql += " AND typ = ?"
params.append(typ)
rows = conn.execute(sql, params).fetchall()
conn.close()
# Calculate similarities
results = []
for row in rows:
chunk_embedding = json.loads(row["embedding"])
similarity = cosine_similarity(query_embedding, chunk_embedding)
if similarity >= min_similarity:
results.append({
"programm_id": row["programm_id"],
"partei": row["partei"],
"typ": row["typ"],
"seite": row["seite"],
"text": row["text"],
"similarity": similarity,
})
# Sort by similarity and return top_k
results.sort(key=lambda x: x["similarity"], reverse=True)
return results[:top_k]
def get_relevant_quotes_for_antrag(
antrag_text: str,
fraktionen: list[str],
top_k_per_partei: int = 2,
) -> dict[str, list[dict]]:
"""Get relevant quotes from Wahl- and Parteiprogramme for an Antrag."""
results = {}
for partei in fraktionen + ["CDU", "GRÜNE"]: # Include Regierungsfraktionen
partei_upper = partei.upper() if partei != "GRÜNE" else "GRÜNE"
# Wahlprogramm
wahl_chunks = find_relevant_chunks(
antrag_text,
parteien=[partei_upper],
typ="wahlprogramm",
top_k=top_k_per_partei,
min_similarity=0.45,
)
# Parteiprogramm
partei_chunks = find_relevant_chunks(
antrag_text,
parteien=[partei_upper],
typ="parteiprogramm",
top_k=top_k_per_partei,
min_similarity=0.45,
)
if wahl_chunks or partei_chunks:
results[partei_upper] = {
"wahlprogramm": wahl_chunks,
"parteiprogramm": partei_chunks,
}
return results
def format_quotes_for_prompt(quotes: dict) -> str:
"""Format quotes for inclusion in LLM prompt."""
if not quotes:
return ""
lines = ["\n## Relevante Passagen aus Wahl- und Parteiprogrammen\n"]
for partei, data in quotes.items():
lines.append(f"\n### {partei}\n")
if data.get("wahlprogramm"):
lines.append("**Wahlprogramm NRW 2022:**")
for chunk in data["wahlprogramm"]:
text = chunk["text"][:500] + "..." if len(chunk["text"]) > 500 else chunk["text"]
lines.append(f'- S. {chunk["seite"]}: "{text}"')
if data.get("parteiprogramm"):
lines.append("\n**Grundsatzprogramm:**")
for chunk in data["parteiprogramm"]:
text = chunk["text"][:500] + "..." if len(chunk["text"]) > 500 else chunk["text"]
lines.append(f'- S. {chunk["seite"]}: "{text}"')
return "\n".join(lines)
def get_programme_info() -> list[dict]:
"""Get list of all indexed programmes with metadata."""
info_list = []
for prog_id, info in PROGRAMME.items():
info_list.append({
"id": prog_id,
"name": info["name"],
"typ": info["typ"],
"partei": info["partei"],
"bundesland": info.get("bundesland"),
"pdf": info["pdf"],
"pdf_url": f"/static/referenzen/{info['pdf']}",
})
return info_list
def get_indexing_status() -> dict:
"""Get status of indexed programmes."""
if not EMBEDDINGS_DB.exists():
return {"indexed": 0, "programmes": []}
conn = sqlite3.connect(EMBEDDINGS_DB)
# Count chunks per program
rows = conn.execute("""
SELECT programm_id, COUNT(*) as chunks
FROM chunks
GROUP BY programm_id
""").fetchall()
conn.close()
indexed = {row[0]: row[1] for row in rows}
programmes = []
for prog_id, info in PROGRAMME.items():
programmes.append({
"id": prog_id,
"name": info["name"],
"partei": info["partei"],
"chunks": indexed.get(prog_id, 0),
"indexed": prog_id in indexed,
})
return {
"indexed": len(indexed),
"total": len(PROGRAMME),
"programmes": programmes,
}