diff --git a/.gitignore b/.gitignore index c82041c..4eae9af 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ reports/ .DS_Store Thumbs.db site/ +.coverage diff --git a/app/clustering.py b/app/clustering.py new file mode 100644 index 0000000..149768a --- /dev/null +++ b/app/clustering.py @@ -0,0 +1,312 @@ +"""Antrag-Clustering via Cosine-Similarity + Union-Find (#105). + +Nutzt die v4-Embeddings aus assessments.summary_embedding (gefüllt durch #123) +und baut eine hierarchische Cluster-Struktur ohne externe Dependencies +(kein sklearn, kein numpy — für <500 Assessments ist pure Python ausreichend). + +Algorithmus: Connected-Components via Union-Find über Kanten mit +Cosine-Similarity ≥ threshold. Level 0 = alle Anträge, Level 1 tighter Cluster. +Bei Clustern > 30 wird rekursiv mit höherem Threshold nachgeteilt. +""" + +import json +import logging +import math +from collections import Counter +from typing import Optional + +import aiosqlite + +from .config import settings + +logger = logging.getLogger(__name__) + +# Cosine-Similarity-Thresholds +# Empirisch kalibriert an der Prod-DB (57 Assessments, 2026-04-11): +# 0.50 → 6 sinnvolle Cluster + 26 singletons (bester Default) +# 0.55 → 5 tighter Cluster +# 0.60 → 4 kleine Cluster, zu streng (die meisten themenähnlichen +# Anträge fallen raus) +# 0.70+ → fast alle singletons +# v4-Embeddings auf deutschen Parlamentsanträgen clustern bei ~0.50. +DEFAULT_THRESHOLD = 0.55 +SUBCLUSTER_THRESHOLD = 0.70 +MAX_CLUSTER_SIZE = 30 # darüber: sub-clustern + + +# ─── Math-Helpers ─────────────────────────────────────────────────────────── + +def _cosine(a: list[float], b: list[float]) -> float: + dot = sum(x * y for x, y in zip(a, b)) + na = math.sqrt(sum(x * x for x in a)) + nb = math.sqrt(sum(x * x for x in b)) + if na == 0 or nb == 0: + return 0.0 + return dot / (na * nb) + + +class UnionFind: + """Klassisches Union-Find mit Path-Compression.""" + + def __init__(self, n: int): + self.parent = list(range(n)) + self.rank = [0] * n + + def find(self, x: int) -> int: + root = x + while self.parent[root] != root: + root = self.parent[root] + # Path-Compression + while self.parent[x] != root: + self.parent[x], x = root, self.parent[x] + return root + + def union(self, a: int, b: int) -> None: + ra, rb = self.find(a), self.find(b) + if ra == rb: + return + if self.rank[ra] < self.rank[rb]: + self.parent[ra] = rb + elif self.rank[ra] > self.rank[rb]: + self.parent[rb] = ra + else: + self.parent[rb] = ra + self.rank[ra] += 1 + + +# ─── DB-Lader ─────────────────────────────────────────────────────────────── + +async def load_assessment_items( + bundesland: Optional[str] = None, +) -> list[dict]: + """Lädt alle Assessments mit gefülltem summary_embedding.""" + sql = """ + SELECT drucksache, title, fraktionen, datum, link, bundesland, + gwoe_score, empfehlung, empfehlung_symbol, themen, + summary_embedding + FROM assessments + WHERE summary_embedding IS NOT NULL + """ + params: list = [] + if bundesland: + sql += " AND bundesland = ?" + params.append(bundesland) + + items = [] + async with aiosqlite.connect(settings.db_path) as db: + db.row_factory = aiosqlite.Row + async with db.execute(sql, params) as cur: + async for row in cur: + try: + vec = json.loads(bytes(row["summary_embedding"]).decode()) + except Exception: + logger.warning("bad embedding for %s", row["drucksache"]) + continue + items.append({ + "drucksache": row["drucksache"], + "title": row["title"], + "fraktionen": json.loads(row["fraktionen"] or "[]"), + "datum": row["datum"], + "link": row["link"], + "bundesland": row["bundesland"], + "gwoe_score": row["gwoe_score"], + "empfehlung": row["empfehlung"], + "empfehlung_symbol": row["empfehlung_symbol"], + "themen": json.loads(row["themen"] or "[]"), + "embedding": vec, + }) + return items + + +# ─── Clustering ───────────────────────────────────────────────────────────── + +def _cluster_indices(items: list[dict], threshold: float) -> list[list[int]]: + """Union-Find-Clustering: Knoten = Items, Kante = cosine ≥ threshold.""" + n = len(items) + uf = UnionFind(n) + for i in range(n): + for j in range(i + 1, n): + if _cosine(items[i]["embedding"], items[j]["embedding"]) >= threshold: + uf.union(i, j) + + groups: dict[int, list[int]] = {} + for i in range(n): + root = uf.find(i) + groups.setdefault(root, []).append(i) + # Sortiere Cluster absteigend nach Größe + return sorted(groups.values(), key=len, reverse=True) + + +def _dominant_fraktion(items: list[dict]) -> Optional[str]: + counts: Counter = Counter() + for item in items: + for f in item.get("fraktionen") or []: + counts[f] += 1 + if not counts: + return None + return counts.most_common(1)[0][0] + + +def _cluster_label(items: list[dict]) -> str: + """Generiert ein Cluster-Label aus den häufigsten Themen der Mitglieder. + + Nimmt die Top-2-3 Themen die in der Mehrheit der Cluster-Mitglieder + vorkommen und kombiniert sie zu einem prägnanten Label. + Fallback: kürzester Titel. + """ + # Themen-Häufigkeit über alle Cluster-Mitglieder + themen_counts: Counter = Counter() + for item in items: + for thema in item.get("themen") or []: + themen_counts[thema] += 1 + + if themen_counts: + # Top-Themen die in ≥50% der Mitglieder vorkommen, max 3 + threshold = max(1, len(items) // 2) + top = [t for t, c in themen_counts.most_common(5) if c >= threshold][:3] + if top: + return " · ".join(top) + + # Fallback: kürzester Titel + titles = [i["title"] for i in items if i.get("title")] + if titles: + return min(titles, key=len) + return "Cluster" + + +def _cluster_summary(cluster_items: list[dict], include_edges: bool = False) -> dict: + """Zusammenfassung eines Clusters für die API-Antwort.""" + scores = [i["gwoe_score"] for i in cluster_items if i.get("gwoe_score") is not None] + avg_score = round(sum(scores) / len(scores), 1) if scores else None + out = { + "size": len(cluster_items), + "label": _cluster_label(cluster_items), + "dominant_fraktion": _dominant_fraktion(cluster_items), + "avg_gwoe_score": avg_score, + "drucksachen": [i["drucksache"] for i in cluster_items], + } + if include_edges: + # Detail-Items pro Mitglied (für Force-Graph-Rendering) + out["nodes"] = [ + { + "drucksache": i["drucksache"], + "title": i["title"], + "bundesland": i["bundesland"], + "fraktionen": i["fraktionen"], + "gwoe_score": i["gwoe_score"], + "empfehlung": i["empfehlung"], + } + for i in cluster_items + ] + # Pairwise Cosine-Similarity als Kanten + edges = [] + for a in range(len(cluster_items)): + for b in range(a + 1, len(cluster_items)): + sim = _cosine(cluster_items[a]["embedding"], cluster_items[b]["embedding"]) + edges.append({"a": a, "b": b, "sim": round(sim, 3)}) + out["edges"] = edges + return out + + +async def build_hierarchy( + bundesland: Optional[str] = None, + threshold: float = DEFAULT_THRESHOLD, + subcluster_threshold: float = SUBCLUSTER_THRESHOLD, + max_cluster_size: int = MAX_CLUSTER_SIZE, +) -> dict: + """Lädt Assessments, clustert sie hierarchisch und gibt eine serialisierbare + Struktur zurück: + + { + "meta": {"total": N, "threshold": 0.70, ...}, + "clusters": [ + {"size": 12, "label": ..., "dominant_fraktion": ..., + "drucksachen": [...], "subclusters": [ ... ] | None}, + ... + ], + "singletons": [drucksache, drucksache, ...] + } + + Bei Clustern größer als max_cluster_size wird rekursiv mit + subcluster_threshold ein zweiter Durchgang gestartet. + """ + items = await load_assessment_items(bundesland=bundesland) + if not items: + return { + "meta": {"total": 0, "threshold": threshold, "bundesland": bundesland}, + "clusters": [], + "singletons": [], + } + + top_groups = _cluster_indices(items, threshold) + + clusters_out: list[dict] = [] + singletons_out: list[str] = [] + + for group in top_groups: + if len(group) == 1: + singletons_out.append(items[group[0]]["drucksache"]) + continue + + cluster_items = [items[i] for i in group] + entry = _cluster_summary(cluster_items, include_edges=True) + + # Sub-Clustern falls zu groß + if len(cluster_items) > max_cluster_size: + sub_groups = _cluster_indices(cluster_items, subcluster_threshold) + subs = [] + for sg in sub_groups: + if len(sg) == 1: + continue + subs.append(_cluster_summary([cluster_items[i] for i in sg])) + entry["subclusters"] = subs + else: + entry["subclusters"] = None + + clusters_out.append(entry) + + return { + "meta": { + "total": len(items), + "threshold": threshold, + "subcluster_threshold": subcluster_threshold, + "max_cluster_size": max_cluster_size, + "bundesland": bundesland, + "num_clusters": len(clusters_out), + "num_singletons": len(singletons_out), + }, + "clusters": clusters_out, + "singletons": singletons_out, + } + + +# ─── Ähnlichkeits-Suche für #108 Teil B ───────────────────────────────────── + +async def find_similar_assessments(drucksache: str, top_k: int = 5) -> list[dict]: + """Findet die top_k ähnlichsten Assessments zu einem gegebenen per + Cosine-Similarity über das Summary-Embedding.""" + items = await load_assessment_items() + target = next((i for i in items if i["drucksache"] == drucksache), None) + if target is None: + return [] + + scored = [] + for other in items: + if other["drucksache"] == drucksache: + continue + sim = _cosine(target["embedding"], other["embedding"]) + scored.append((sim, other)) + scored.sort(key=lambda t: t[0], reverse=True) + + return [ + { + "drucksache": other["drucksache"], + "title": other["title"], + "bundesland": other["bundesland"], + "fraktionen": other["fraktionen"], + "gwoe_score": other["gwoe_score"], + "empfehlung": other["empfehlung"], + "similarity": round(sim, 3), + } + for sim, other in scored[:top_k] + ] diff --git a/app/drucksache_typen.py b/app/drucksache_typen.py new file mode 100644 index 0000000..e6941f4 --- /dev/null +++ b/app/drucksache_typen.py @@ -0,0 +1,88 @@ +"""Drucksache-Typ-Normalisierung (#127). + +Jeder Landtag hat eigene Bezeichnungen für Dokumenttypen. Dieses Modul +normalisiert sie auf einheitliche Kategorien und bestimmt ob eine +Drucksache abstimmbar ist (= GWÖ-Bewertung sinnvoll). +""" + +# Normierte Kategorien +ANTRAG = "antrag" +GESETZENTWURF = "gesetzentwurf" +AENDERUNGSANTRAG = "aenderungsantrag" +DRINGLICHKEITSANTRAG = "dringlichkeitsantrag" +ENTSCHLIESSUNGSANTRAG = "entschliessungsantrag" +BESCHLUSSEMPFEHLUNG = "beschlussempfehlung" +KLEINE_ANFRAGE = "kleine_anfrage" +GROSSE_ANFRAGE = "grosse_anfrage" +UNTERRICHTUNG = "unterrichtung" +PETITION = "petition" +WAHLVORSCHLAG = "wahlvorschlag" +BERICHT = "bericht" +SONSTIGE = "sonstige" + +ABSTIMMBARE_TYPEN = { + ANTRAG, + GESETZENTWURF, + AENDERUNGSANTRAG, + DRINGLICHKEITSANTRAG, + ENTSCHLIESSUNGSANTRAG, +} + +# Übersetzungstabelle: Original-Typ (lowercase) → normierter Typ. +# Keys werden case-insensitive + substring-matched. +# Reihenfolge: spezifischere zuerst (z.B. "kleine anfrage" vor "anfrage"). +_TYP_MAP = [ + # Abstimmbar + ("gesetzentwurf", GESETZENTWURF), + ("änderungsantrag", AENDERUNGSANTRAG), + ("aenderungsantrag", AENDERUNGSANTRAG), + ("dringlichkeitsantrag", DRINGLICHKEITSANTRAG), + ("entschließungsantrag", ENTSCHLIESSUNGSANTRAG), + ("entschliessungsantrag", ENTSCHLIESSUNGSANTRAG), + ("antrag gemäß", ANTRAG), + ("antrag", ANTRAG), + # Nicht abstimmbar + ("kleine anfrage", KLEINE_ANFRAGE), + ("große anfrage", GROSSE_ANFRAGE), + ("grosse anfrage", GROSSE_ANFRAGE), + ("anfrage", KLEINE_ANFRAGE), + ("beschlussempfehlung", BESCHLUSSEMPFEHLUNG), + ("unterrichtung", UNTERRICHTUNG), + ("bericht", BERICHT), + ("mitteilung", UNTERRICHTUNG), + ("vorlage", UNTERRICHTUNG), + ("petition", PETITION), + ("wahlvorschlag", WAHLVORSCHLAG), + ("stellungnahme", SONSTIGE), + ("drucksache", SONSTIGE), +] + + +def normalize_typ(original: str) -> str: + """Normalisiert einen BL-spezifischen Typ-String auf eine Kategorie. + + Case-insensitiv, Substring-Match, spezifischere Patterns zuerst. + """ + if not original: + return SONSTIGE + low = original.lower().strip() + for pattern, norm in _TYP_MAP: + if pattern in low: + return norm + return SONSTIGE + + +def ist_abstimmbar(typ_normiert: str) -> bool: + """Prüft ob ein normierter Typ zur Abstimmung steht. + + ``sonstige`` wird durchgelassen (benefit of the doubt) — wenn der + Adapter den Typ nicht bestimmen kann (z.B. NRW liefert nur + "Drucksache"), wird der echte Check erst beim Analysieren gemacht + (aus dem Dokument-Text). + """ + return typ_normiert in ABSTIMMBARE_TYPEN or typ_normiert == SONSTIGE + + +def ist_abstimmbar_original(original: str) -> bool: + """Convenience: prüft direkt am Original-Typ-String.""" + return ist_abstimmbar(normalize_typ(original)) diff --git a/app/embeddings.py b/app/embeddings.py index 920f764..e77b8ef 100644 --- a/app/embeddings.py +++ b/app/embeddings.py @@ -15,9 +15,15 @@ from openai import OpenAI from .config import settings -# Embedding model -EMBEDDING_MODEL = "text-embedding-v3" -EMBEDDING_DIMENSIONS = 1024 +# Embedding-Modell (Issue #123 Migration v3 → v4): +# WRITE = Modell für neue Embeddings (Reindex, neue Assessments, neue Queries) +# READ = Modell, nach dem find_relevant_chunks filtert +# Zwei Settings erlauben Zero-Downtime-Switch. Während der Reindex läuft, bleibt +# READ auf v3 (Prod funktioniert), WRITE produziert v4 parallel. Nach Reindex: +# READ auf v4 flippen, alte v3-Rows löschen. +EMBEDDING_MODEL = settings.embedding_model_write +EMBEDDING_MODEL_READ = settings.embedding_model_read +EMBEDDING_DIMENSIONS = settings.embedding_dimensions # Database path EMBEDDINGS_DB = settings.data_dir / "embeddings.db" @@ -325,6 +331,14 @@ def init_embeddings_db(): conn.execute("ALTER TABLE chunks ADD COLUMN bundesland TEXT") conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_bundesland ON chunks(bundesland)") + # Migration #123: model-Spalte ergänzen. Bestehende Rows bekommen das alte + # v3-Default, neue Rows werden mit EMBEDDING_MODEL (aus config) befüllt. + if "model" not in cols: + conn.execute( + "ALTER TABLE chunks ADD COLUMN model TEXT NOT NULL DEFAULT 'text-embedding-v3'" + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_model ON chunks(model)") + # Backfill: Bundesland aus PROGRAMME-Registry für bestehende Zeilen # nachtragen. Grundsatzprogramme bleiben NULL. for prog_id, info in PROGRAMME.items(): @@ -347,17 +361,50 @@ def get_client() -> OpenAI: ) -def create_embedding(text: str) -> list[float]: - """Create embedding for text using Qwen.""" +def create_embedding(text: str, model: Optional[str] = None) -> list[float]: + """Create embedding for text using Qwen. + + Args: + model: Optionaler Override. Default = EMBEDDING_MODEL (write model). + Während der Migration #123 ruft find_relevant_chunks mit + EMBEDDING_MODEL_READ auf, damit Query-Embeddings im selben + Vektorraum wie die gespeicherten Chunks liegen. + """ client = get_client() response = client.embeddings.create( - model=EMBEDDING_MODEL, + model=model or EMBEDDING_MODEL, input=text, dimensions=EMBEDDING_DIMENSIONS, ) return response.data[0].embedding +# DashScope text-embedding-v4 erlaubt bis zu 10 Texte pro Batch-Call. +# 10 ist das harte Maximum — bei mehr gibt die API Fehler. +EMBEDDING_BATCH_SIZE = 10 + + +def create_embeddings_batch(texts: list[str], model: Optional[str] = None) -> list[list[float]]: + """Batch-Embedding — ein API-Call für bis zu EMBEDDING_BATCH_SIZE Texte. + + Gibt die Embeddings in derselben Reihenfolge wie die Input-Liste zurück. + Rate-Limit-freundlich: statt 10 sequentielle Calls genügt einer. + """ + if not texts: + return [] + if len(texts) > EMBEDDING_BATCH_SIZE: + raise ValueError(f"Batch zu groß: {len(texts)} > {EMBEDDING_BATCH_SIZE}") + client = get_client() + response = client.embeddings.create( + model=model or EMBEDDING_MODEL, + input=texts, + dimensions=EMBEDDING_DIMENSIONS, + ) + # DashScope gibt die Embeddings in der Reihenfolge zurück, in der sie + # gesendet wurden (index-basiert). Wir sortieren defensiv nach index. + return [d.embedding for d in sorted(response.data, key=lambda d: d.index)] + + def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]: """Split text into overlapping chunks by words.""" words = text.split() @@ -403,8 +450,13 @@ def index_programm(programm_id: str, pdf_dir: Path) -> int: conn = sqlite3.connect(EMBEDDINGS_DB) - # Remove existing chunks for this program - conn.execute("DELETE FROM chunks WHERE programm_id = ?", (programm_id,)) + # Remove existing chunks for this program — nur für das aktuelle WRITE- + # Modell, damit parallel existierende v3-Rows während der #123-Migration + # nicht verloren gehen. + conn.execute( + "DELETE FROM chunks WHERE programm_id = ? AND model = ?", + (programm_id, EMBEDDING_MODEL), + ) # Extract and chunk pages = extract_text_with_pages(pdf_path) @@ -422,8 +474,8 @@ def index_programm(programm_id: str, pdf_dir: Path) -> int: embedding_blob = json.dumps(embedding).encode() conn.execute(""" - INSERT INTO chunks (programm_id, partei, typ, seite, text, embedding, bundesland) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT INTO chunks (programm_id, partei, typ, seite, text, embedding, bundesland, model) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( programm_id, info["partei"], @@ -432,6 +484,7 @@ def index_programm(programm_id: str, pdf_dir: Path) -> int: chunk_text_content, embedding_blob, info.get("bundesland"), # NULL für Grundsatzprogramme + EMBEDDING_MODEL, )) total_chunks += 1 except Exception as e: @@ -445,6 +498,38 @@ def index_programm(programm_id: str, pdf_dir: Path) -> int: return total_chunks +def create_assessment_embedding( + title: str, + zusammenfassung: Optional[str], + themen: Optional[list[str]], + bundesland: Optional[str] = None, +) -> tuple[Optional[bytes], Optional[str]]: + """Erzeuge ein Assessment-Embedding für Clustering (#105) und Ähnlichkeit (#108). + + Kombiniert Titel + Kurzfassung + Themen + Bundesland zu einem einzelnen + String und embedded ihn mit dem aktuellen WRITE-Modell. Gibt `(None, None)` + zurück wenn die Embedding-API fehlschlägt — das Backfill-Script zieht + solche Assessments später nach. + """ + parts = [title or ""] + if zusammenfassung: + parts.append(zusammenfassung) + if themen: + parts.append(", ".join(themen)) + if bundesland: + parts.append(f"Bundesland: {bundesland}") + text = "\n".join(p for p in parts if p).strip() + if not text: + return None, None + + try: + vec = create_embedding(text, model=EMBEDDING_MODEL) + return json.dumps(vec).encode(), EMBEDDING_MODEL + except Exception: + logger.exception("create_assessment_embedding failed") + return None, None + + def cosine_similarity(a: list[float], b: list[float]) -> float: """Calculate cosine similarity between two vectors.""" dot = sum(x * y for x, y in zip(a, b)) @@ -471,14 +556,17 @@ def find_relevant_chunks( berücksichtigt. Wenn None, kein Filter. """ - query_embedding = create_embedding(query) + # Query-Embedding muss im selben Vektorraum wie die gespeicherten Chunks + # liegen — während der Migration #123 ist das EMBEDDING_MODEL_READ. + query_embedding = create_embedding(query, model=EMBEDDING_MODEL_READ) conn = sqlite3.connect(EMBEDDINGS_DB) conn.row_factory = sqlite3.Row - # Build query - sql = "SELECT * FROM chunks WHERE 1=1" - params = [] + # Build query — filtert auf das aktive READ-Modell, damit v3- und + # v4-Embeddings nicht gemischt werden (Cosine wäre Nonsens). + sql = "SELECT * FROM chunks WHERE model = ?" + params = [EMBEDDING_MODEL_READ] if parteien: placeholders = ",".join("?" * len(parteien)) diff --git a/app/mail.py b/app/mail.py new file mode 100644 index 0000000..f0d9071 --- /dev/null +++ b/app/mail.py @@ -0,0 +1,220 @@ +"""Mail-Sending + Daily-Digest für E-Mail-Benachrichtigungen (#124). + +Nutzt die Standard-Library `smtplib` (blockierend) in einem Thread-Executor, +damit kein zusätzlicher Dependency-Eintrag nötig ist. 1blu SMTP: + smtp.1blu.de:465 SSL, username = Postfachname (NICHT E-Mail!) +Credentials kommen aus settings.smtp_user / smtp_password via ENV. + +Unsubscribe-Token: HMAC-SHA256 von sub_id + secret, URL-sicher base64-encoded. +""" +from __future__ import annotations + +import asyncio +import base64 +import hashlib +import hmac +import html +import logging +import smtplib +import ssl +from datetime import datetime +from email.message import EmailMessage + +from .config import settings + +logger = logging.getLogger(__name__) + + +# ─── Unsubscribe-Token ────────────────────────────────────────────────────── + +def _unsubscribe_token(sub_id: int) -> str: + """Erzeugt HMAC-Token für Unsubscribe-Link.""" + msg = str(sub_id).encode() + sig = hmac.new(settings.unsubscribe_secret.encode(), msg, hashlib.sha256).digest() + return base64.urlsafe_b64encode(sig).decode().rstrip("=")[:22] + + +def verify_unsubscribe_token(sub_id: int, token: str) -> bool: + """Verifiziert, dass der Token zur sub_id passt. Konstante Zeit.""" + expected = _unsubscribe_token(sub_id) + return hmac.compare_digest(expected, token) + + +def unsubscribe_url(sub_id: int) -> str: + token = _unsubscribe_token(sub_id) + return f"{settings.base_url}/unsubscribe/{sub_id}/{token}" + + +# ─── SMTP-Send ────────────────────────────────────────────────────────────── + +def _send_sync(to_email: str, subject: str, text_body: str, html_body: str) -> None: + """Blockierender Send via smtplib.""" + if not settings.smtp_host or not settings.smtp_user: + raise RuntimeError("SMTP nicht konfiguriert (settings.smtp_host/user leer)") + + msg = EmailMessage() + msg["From"] = f"{settings.smtp_from_name} <{settings.smtp_from_email}>" + msg["To"] = to_email + msg["Subject"] = subject + msg.set_content(text_body) + msg.add_alternative(html_body, subtype="html") + + ctx = ssl.create_default_context() + with smtplib.SMTP_SSL(settings.smtp_host, settings.smtp_port, context=ctx) as server: + server.login(settings.smtp_user, settings.smtp_password) + server.send_message(msg) + + +async def send_mail(to_email: str, subject: str, text_body: str, html_body: str) -> None: + """Async-Wrapper — SMTP-Call läuft im Thread-Executor.""" + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, _send_sync, to_email, subject, text_body, html_body) + + +# ─── Digest-Komposition ───────────────────────────────────────────────────── + +def _filter_assessments(rows: list[dict], bundesland: str | None, partei: str | None, since: str | None) -> list[dict]: + """Filtert Assessment-Rows nach Abo-Kriterien.""" + result = [] + for r in rows: + if bundesland and (r.get("bundesland") or "") != bundesland: + continue + if partei: + fraktionen = r.get("fraktionen") or [] + if not any(partei.upper() in (f or "").upper() for f in fraktionen): + continue + if since and (r.get("updated_at") or "") <= since: + continue + result.append(r) + return result + + +def compose_digest(sub: dict, assessments: list[dict]) -> tuple[str, str, str]: + """Baut Subject, Text- und HTML-Body für einen Digest. + + Returns: (subject, text_body, html_body) + """ + n = len(assessments) + filter_label_parts = [] + if sub.get("bundesland"): + filter_label_parts.append(sub["bundesland"]) + if sub.get("partei"): + filter_label_parts.append(sub["partei"]) + filter_label = " · ".join(filter_label_parts) if filter_label_parts else "alle Bundesländer & Parteien" + + subject = f"[GWÖ-Antragsprüfer] {n} neue Bewertung{'en' if n != 1 else ''} — {filter_label}" + + unsub = unsubscribe_url(sub["id"]) + + # Plaintext + text_lines = [ + f"Neue Antragsbewertungen — Filter: {filter_label}", + "=" * 60, + "", + ] + for a in assessments[:20]: + score = a.get("gwoe_score") + title = a.get("title") or a.get("drucksache") + emp = a.get("empfehlung") or "" + fraktionen = ", ".join(a.get("fraktionen") or []) + url = f"{settings.base_url}/?drucksache={a.get('drucksache')}" + text_lines.append(f"• {title}") + text_lines.append(f" Score: {score}/10 — {emp}") + text_lines.append(f" Fraktionen: {fraktionen}") + text_lines.append(f" {url}") + text_lines.append("") + if n > 20: + text_lines.append(f"… und {n - 20} weitere. Alle anzeigen: {settings.base_url}") + text_lines.append("") + text_lines.append("—") + text_lines.append(f"Abo verwalten: {settings.base_url}") + text_lines.append(f"Abbestellen: {unsub}") + text_body = "\n".join(text_lines) + + # HTML + html_items = [] + for a in assessments[:20]: + score = a.get("gwoe_score") + title = html.escape(a.get("title") or a.get("drucksache") or "") + emp = html.escape(a.get("empfehlung") or "") + fraktionen = html.escape(", ".join(a.get("fraktionen") or [])) + zus = html.escape((a.get("antrag_zusammenfassung") or "")[:200]) + url = html.escape(f"{settings.base_url}/?drucksache={a.get('drucksache')}") + html_items.append(f""" +
""") + + more_link = "" + if n > 20: + more_link = f'… und {n - 20} weitere ansehen
' + + html_body = f""" + +Filter: {html.escape(filter_label)}
+{''.join(html_items)} +{more_link} ++ Abo verwalten · + Abbestellen +
+""" + + return subject, text_body, html_body + + +async def run_daily_digest() -> dict: + """Daily-Digest-Runner. Iteriert alle due Abos und verschickt. + + Gibt Statistik zurück: {sent, failed, skipped_empty}. + """ + from .database import ( + get_all_assessments, + get_all_subscriptions_due, + mark_subscription_sent, + ) + + stats = {"sent": 0, "failed": 0, "skipped_empty": 0} + + subs = await get_all_subscriptions_due("daily") + if not subs: + logger.info("run_daily_digest: keine due subscriptions") + return stats + + all_assessments = await get_all_assessments(None) + + for sub in subs: + matches = _filter_assessments( + all_assessments, + bundesland=sub.get("bundesland"), + partei=sub.get("partei"), + since=sub.get("last_sent"), + ) + if not matches: + stats["skipped_empty"] += 1 + # Last-sent trotzdem setzen, damit wir nicht jede Minute wieder testen + await mark_subscription_sent(sub["id"]) + continue + + try: + subject, text_body, html_body = compose_digest(sub, matches) + await send_mail(sub["email"], subject, text_body, html_body) + await mark_subscription_sent(sub["id"]) + stats["sent"] += 1 + logger.info("digest sent to %s (%d items)", sub["email"], len(matches)) + except Exception: + logger.exception("digest failed for sub_id=%s", sub["id"]) + stats["failed"] += 1 + + return stats + + +if __name__ == "__main__": + # python -m app.mail → führt den Daily-Digest-Lauf aus + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + result = asyncio.run(run_daily_digest()) + print(f"Digest-Lauf fertig: {result}") diff --git a/app/parlamente.py b/app/parlamente.py index 573a143..974a514 100644 --- a/app/parlamente.py +++ b/app/parlamente.py @@ -21,19 +21,43 @@ class Drucksache: datum: str # ISO date link: str # PDF URL bundesland: str - typ: str = "Antrag" # Antrag, Anfrage, Beschlussempfehlung, etc. + typ: str = "Antrag" # Original-Typ vom Landtag (z.B. "Kleine Anfrage", "Gesetzentwurf") + typ_normiert: str = "" # Normierter Typ (wird automatisch gesetzt) + + def __post_init__(self): + from .drucksache_typen import normalize_typ + if not self.typ_normiert: + self.typ_normiert = normalize_typ(self.typ) class ParlamentAdapter(ABC): """Base adapter for searching parliament documents.""" - + bundesland: str name: str - + filter_abstimmbar: bool = True # #127: nur abstimmbare Typen zurückgeben + @abstractmethod async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search for documents matching query.""" pass + + def _filter_abstimmbar(self, docs: list[Drucksache]) -> list[Drucksache]: + """Filtert nicht-abstimmbare Drucksachen heraus (#127). + + Wird von Adaptern am Ende von search() aufgerufen. Lässt + nur Anträge, Gesetzentwürfe, Änderungsanträge etc. durch. + """ + if not self.filter_abstimmbar: + return docs + from .drucksache_typen import ist_abstimmbar + filtered = [d for d in docs if ist_abstimmbar(d.typ_normiert)] + if len(filtered) < len(docs): + logger.debug( + "%s: %d von %d Drucksachen als nicht-abstimmbar gefiltert", + self.bundesland, len(docs) - len(filtered), len(docs), + ) + return filtered @abstractmethod async def get_document(self, drucksache: str) -> Optional[Drucksache]: @@ -87,9 +111,16 @@ class NRWAdapter(ParlamentAdapter): return (parts[0], filter_terms, False) def _matches_all_terms(self, doc: 'Drucksache', terms: list[str], is_exact: bool) -> bool: - """Check if document matches all search terms (AND logic).""" + """Check if document matches all search terms (AND logic). + + Empty, whitespace-only, or bare-wildcard (``*``) terms are treated as + match-all so that the monitoring path (query="", " ", "*") never filters + out valid results fetched from OPAL. + """ + # Wildcard / empty query — match everything + if not terms or all(not t.strip() or t.strip() == "*" for t in terms): + return True searchable = f"{doc.title} {doc.drucksache} {' '.join(doc.fraktionen)} {doc.typ}".lower() - if is_exact: # Exact phrase must appear return terms[0] in searchable @@ -100,9 +131,18 @@ class NRWAdapter(ParlamentAdapter): async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search NRW Landtag documents via OPAL portal.""" results = [] - + # Parse query for AND logic api_query, filter_terms, is_exact = self._parse_query(query) + + # OPAL rejects empty dokNum with 0 results. For the monitoring path + # (query="" / " " / "*"), substitute the current year so OPAL returns + # recent documents. The filter_terms list stays [""] / [" "] / ["*"], + # and _matches_all_terms with "" or " " or "*" matches every document, + # so nothing is filtered out client-side. + if not api_query.strip() or api_query.strip() in ("*",): + from datetime import date as _date + api_query = str(_date.today().year) async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: try: @@ -3171,7 +3211,7 @@ class SaarlandAdapter(ParlamentAdapter): return data.get("FilteredResult", []) or [] except Exception: logger.exception("SL search request error") - return [] + raise async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Volltextsuche über die aktuelle Wahlperiode, gefiltert auf Anträge. @@ -3181,10 +3221,15 @@ class SaarlandAdapter(ParlamentAdapter): und Gesetzentwürfe), und kürzt auf ``limit``. Sortierung kommt relevance-based vom Server — für die UI ist Relevanz zu einer Query meist wertvoller als Date-DESC. + + Netzwerkfehler (Timeout, ConnectError, HTTP-Fehler) werden nicht + geschluckt — sie propagieren, damit das Monitoring sie als + ``errors``-Text in ``monitoring_daily_summary`` erfassen kann. """ async with self._make_client() as client: # Take großzügig, weil der Antrag-Filter ~30-50% der Hits drosselt take = max(limit * 5, 30) + # _post_search re-raises alle Netzwerkfehler (Fix #142) items = await self._post_search(client, query, skip=0, take=take) results: list[Drucksache] = [] diff --git a/app/redline_utils.py b/app/redline_utils.py new file mode 100644 index 0000000..f3f5951 --- /dev/null +++ b/app/redline_utils.py @@ -0,0 +1,88 @@ +"""Redline-Parser-Hilfsfunktionen — keine FastAPI-Abhängigkeiten. + +Wird von app.main._row_to_detail() und von Tests direkt importiert. +""" +from __future__ import annotations + +import re +from urllib.parse import quote_plus + + +def parse_redline_segments(vorschlag: str | None) -> list[dict]: + """Parst §INS§text§INS§/§DEL§text§DEL§-Marker sowie **text**- und + ~~text~~-Markdown in eine Liste von {type, text}-Segmenten (ctx/ins/del). + + Toleriert beide Formate gleichzeitig. Unausgewogene Marker bleiben als ctx. + Leerer oder None-Input liefert []. + + Beispiel: + >>> parse_redline_segments("§ 3 §DEL§alt§DEL§ §INS§neu§INS§ Ende") + [{'type': 'ctx', 'text': '§ 3 '}, {'type': 'del', 'text': 'alt'}, + {'type': 'ctx', 'text': ' '}, {'type': 'ins', 'text': 'neu'}, + {'type': 'ctx', 'text': ' Ende'}] + """ + if not vorschlag: + return [] + + # Normalisierung: §INS§...§INS§ und §DEL§...§DEL§ → interne Tags + text = vorschlag + text = re.sub(r"§INS§(.*?)§INS§", r"\1", text, flags=re.DOTALL) + text = re.sub(r"§DEL§(.*?)§DEL§", r"0?d[i-1]:l,v.x1=i
0)for(i=0;i=n)&&(e=n);else{let r=-1;for(let i of t)null!=(i=n(i,++r,t))&&(e=i)&&(e=i)}return e}function tt(t,n){let e,r=-1,i=-1;if(void 0===n)for(const n of t)++i,null!=n&&(e =0;)(r=i[o])&&(a&&4^r.compareDocumentPosition(a)&&a.parentNode.insertBefore(r,a),a=r);return this},sort:function(t){function n(n,e){return n&&e?t(n.__data__,e.__data__):!n-!e}t||(t=un);for(var e=this._groups,r=e.length,i=new Array(r),o=0;o >1)+h+t+M+S.slice(A);break;default:t=S+h+t+M}return u(t)}return y=void 0===y?6:/[gprs]/.test(_)?Math.max(1,Math.min(21,y)):Math.max(0,Math.min(20,y)),M.toString=function(){return t+""},M}return{format:l,formatPrefix:function(t,n){var e=l(((t=Jc(t)).type="f",t)),r=3*Math.max(-8,Math.min(8,Math.floor(Zc(n)/3))),i=Math.pow(10,-r),o=uf[8+r/3];return function(t){return e(i*t)+o}}}}function ff(n){return of=cf(n),t.format=of.format,t.formatPrefix=of.formatPrefix,of}function sf(t){return Math.max(0,-Zc(Math.abs(t)))}function lf(t,n){return Math.max(0,3*Math.max(-8,Math.min(8,Math.floor(Zc(n)/3)))-Zc(Math.abs(t)))}function hf(t,n){return t=Math.abs(t),n=Math.abs(n)-t,Math.max(0,Zc(n)-Zc(t))+1}t.format=void 0,t.formatPrefix=void 0,ff({thousands:",",grouping:[3],currency:["$",""]});var df=1e-6,pf=1e-12,gf=Math.PI,yf=gf/2,vf=gf/4,_f=2*gf,bf=180/gf,mf=gf/180,xf=Math.abs,wf=Math.atan,Mf=Math.atan2,Tf=Math.cos,Af=Math.ceil,Sf=Math.exp,Ef=Math.hypot,Nf=Math.log,kf=Math.pow,Cf=Math.sin,Pf=Math.sign||function(t){return t>0?1:t<0?-1:0},zf=Math.sqrt,$f=Math.tan;function Df(t){return t>1?0:t<-1?gf:Math.acos(t)}function Rf(t){return t>1?yf:t<-1?-yf:Math.asin(t)}function Ff(t){return(t=Cf(t/2))*t}function qf(){}function Uf(t,n){t&&Of.hasOwnProperty(t.type)&&Of[t.type](t,n)}var If={Feature:function(t,n){Uf(t.geometry,n)},FeatureCollection:function(t,n){for(var e=t.features,r=-1,i=e.length;++r=0?1:-1,i=r*e,o=Tf(n=(n*=mf)/2+vf),a=Cf(n),u=Vf*a,c=Gf*o+u*Tf(i),f=u*r*Cf(i);as.add(Mf(f,c)),Xf=t,Gf=o,Vf=a}function ds(t){return[Mf(t[1],t[0]),Rf(t[2])]}function ps(t){var n=t[0],e=t[1],r=Tf(e);return[r*Tf(n),r*Cf(n),Cf(e)]}function gs(t,n){return t[0]*n[0]+t[1]*n[1]+t[2]*n[2]}function ys(t,n){return[t[1]*n[2]-t[2]*n[1],t[2]*n[0]-t[0]*n[2],t[0]*n[1]-t[1]*n[0]]}function vs(t,n){t[0]+=n[0],t[1]+=n[1],t[2]+=n[2]}function _s(t,n){return[t[0]*n,t[1]*n,t[2]*n]}function bs(t){var n=zf(t[0]*t[0]+t[1]*t[1]+t[2]*t[2]);t[0]/=n,t[1]/=n,t[2]/=n}var ms,xs,ws,Ms,Ts,As,Ss,Es,Ns,ks,Cs,Ps,zs,$s,Ds,Rs,Fs={point:qs,lineStart:Is,lineEnd:Os,polygonStart:function(){Fs.point=Bs,Fs.lineStart=Ys,Fs.lineEnd=Ls,rs=new T,cs.polygonStart()},polygonEnd:function(){cs.polygonEnd(),Fs.point=qs,Fs.lineStart=Is,Fs.lineEnd=Os,as<0?(Wf=-(Kf=180),Zf=-(Qf=90)):rs>df?Qf=90:rs<-df&&(Zf=-90),os[0]=Wf,os[1]=Kf},sphere:function(){Wf=-(Kf=180),Zf=-(Qf=90)}};function qs(t,n){is.push(os=[Wf=t,Kf=t]),n1;)i-=2;for(let t=2;t0){if(n>=this.ymax)return null;(i=(this.ymax-n)/r)0){if(t>=this.xmax)return null;(i=(this.xmax-t)/e)this.xmax?2:0)|(n9999?"+"+Ku(n,6):Ku(n,4))+"-"+Ku(t.getUTCMonth()+1,2)+"-"+Ku(t.getUTCDate(),2)+(o?"T"+Ku(e,2)+":"+Ku(r,2)+":"+Ku(i,2)+"."+Ku(o,3)+"Z":i?"T"+Ku(e,2)+":"+Ku(r,2)+":"+Ku(i,2)+"Z":r||e?"T"+Ku(e,2)+":"+Ku(r,2)+"Z":"")}function Ju(t){var n=new RegExp('["'+t+"\n\r]"),e=t.charCodeAt(0);function r(t,n){var r,i=[],o=t.length,a=0,u=0,c=o<=0,f=!1;function s(){if(c)return Hu;if(f)return f=!1,ju;var n,r,i=a;if(t.charCodeAt(i)===Xu){for(;a++=v)<<1|t>=y)&&(c=p[p.length-1],p[p.length-1]=p[p.length-1-f],p[p.length-1-f]=c)}else{var _=t-+this._x.call(null,g.data),b=n-+this._y.call(null,g.data),m=_*_+b*b;if(m