"""Themen × Anträge Matching fuer das Aktuelle-Themen-Dashboard (#170 Phase 2). Verschneidet News-Artikel-Embeddings (aus news_articles.summary_embedding) mit Antrag-Embeddings (assessments.summary_embedding) per Cosine-Similarity. Liefert pro News-Artikel die Top-K-passendsten Anträge. Reuse: - ``embeddings.cosine_similarity`` fuer den Vektor-Vergleich - Beide Tabellen nutzen denselben Embedding-Modell-Vektorraum (qwen v4), daher direkter Cross-Vergleich moeglich - Filter ueber ``embedding_model``-Spalte, falls Migration laueft **Performance-Cache:** ``aggregate_top_themen`` und ``aggregate_news_cluster`` sind teuer (cosine über ~300 News × ~100 Bewertungen = 30k Ops). Daher TTL-Cache: gleiche Filter-Tuples werden 60 s lang aus Memory geliefert, danach neu berechnet. Cache wird beim Modul-Import geleert (keine persistente Stale-Gefahr nach Deploy). """ from __future__ import annotations import json import logging import sqlite3 import time from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) _CACHE: dict[tuple, tuple[float, dict]] = {} _CACHE_TTL_SECONDS = 60 def _cache_get(key: tuple) -> Optional[dict]: entry = _CACHE.get(key) if entry is None: return None expires_at, value = entry if time.time() > expires_at: _CACHE.pop(key, None) return None return value def _cache_set(key: tuple, value: dict) -> None: _CACHE[key] = (time.time() + _CACHE_TTL_SECONDS, value) def cache_clear() -> None: """Leert den TTL-Cache. Aufruf z.B. nach News-Aggregator-Lauf, damit neue News sofort sichtbar werden.""" _CACHE.clear() def _load_embeddings( db_path: Path, table: str, select_cols: list[str], where_extra: str = "", params: tuple = (), ) -> list[dict]: """Generischer Loader fuer Tabellen mit ``summary_embedding``-Spalte. Liefert Zeilen mit decoded Embedding-Vektor (oder filtert aus, wenn Modell nicht zum aktuellen READ-Modell passt). """ from . import embeddings as emb if not Path(db_path).exists(): return [] conn = sqlite3.connect(str(db_path)) try: conn.row_factory = sqlite3.Row cols = ", ".join(select_cols) sql = ( f"SELECT {cols}, summary_embedding, embedding_model " f"FROM {table} " f"WHERE summary_embedding IS NOT NULL {where_extra}" ) rows = conn.execute(sql, params).fetchall() finally: conn.close() out = [] for r in rows: if r["embedding_model"] != emb.EMBEDDING_MODEL_READ: continue try: vec = json.loads(r["summary_embedding"]) except (json.JSONDecodeError, TypeError): continue d = dict(r) d["_vec"] = vec out.append(d) return out def find_anträge_for_news( news_url: str, top_k: int = 5, min_similarity: float = 0.4, db_path: Optional[Path] = None, ) -> list[dict]: """Pro gegebener News-URL: Top-K aehnlichste Antraege per Cosine-Match. Filter ``min_similarity`` haelt den Cut-Off fuer "passt einigermassen". 0.4 ist empirisch der Punkt, ab dem qwen-v4-Embeddings semantisch relevant matchen. """ from .config import settings from . import embeddings as emb path = db_path or settings.db_path if not Path(path).exists(): return [] # 1. News-Vektor laden conn = sqlite3.connect(str(path)) try: row = conn.execute( """SELECT summary_embedding, embedding_model FROM news_articles WHERE url=?""", (news_url,), ).fetchone() finally: conn.close() if not row or not row[0] or row[1] != emb.EMBEDDING_MODEL_READ: return [] try: news_vec = json.loads(row[0]) except (json.JSONDecodeError, TypeError): return [] # 2. Alle Assessments mit Embedding laden + scoren assessments = _load_embeddings( Path(path), "assessments", ["drucksache", "title", "bundesland", "fraktionen", "gwoe_score", "empfehlung", "themen", "datum"], ) scored = [] for a in assessments: sim = emb.cosine_similarity(news_vec, a["_vec"]) if sim < min_similarity: continue scored.append({ "drucksache": a["drucksache"], "title": a["title"], "bundesland": a["bundesland"], "fraktionen": json.loads(a["fraktionen"] or "[]"), "gwoe_score": a["gwoe_score"], "empfehlung": a["empfehlung"], "themen": json.loads(a["themen"] or "[]"), "datum": a["datum"], "similarity": round(sim, 3), }) scored.sort(key=lambda x: x["similarity"], reverse=True) return scored[:top_k] def find_news_for_antrag( drucksache: str, top_k: int = 5, min_similarity: float = 0.4, days_window: int = 90, db_path: Optional[Path] = None, ) -> list[dict]: """Pro gegebener Drucksache: Top-K aehnlichste News-Artikel per Cosine. Filtert News auf ein Zeitfenster (Default 90 Tage), damit Pressemitteilungen aus aktueller Aktualitaet stammen. """ from .config import settings from . import embeddings as emb path = db_path or settings.db_path if not Path(path).exists(): return [] # 1. Antrag-Vektor laden conn = sqlite3.connect(str(path)) try: row = conn.execute( """SELECT summary_embedding, embedding_model FROM assessments WHERE drucksache=?""", (drucksache,), ).fetchone() finally: conn.close() if not row or not row[0] or row[1] != emb.EMBEDDING_MODEL_READ: return [] try: antrag_vec = json.loads(row[0]) except (json.JSONDecodeError, TypeError): return [] # 2. News mit Datums-Filter laden cutoff = datetime.now(timezone.utc).timestamp() - days_window * 86400 news = _load_embeddings( Path(path), "news_articles", ["url", "titel", "summary", "datum", "source", "ressort", "tags"], ) scored = [] for n in news: sim = emb.cosine_similarity(antrag_vec, n["_vec"]) if sim < min_similarity: continue # Datums-Filter try: news_ts = datetime.fromisoformat( n["datum"].replace("Z", "+00:00") ).timestamp() if news_ts < cutoff: continue except (ValueError, AttributeError): pass # Wenn Datum nicht parsbar, lass es durch try: tags = json.loads(n["tags"]) if n["tags"] else [] except (json.JSONDecodeError, TypeError): tags = [] scored.append({ "url": n["url"], "titel": n["titel"], "summary": n["summary"], "datum": n["datum"], "source": n["source"], "ressort": n["ressort"], "tags": tags, "similarity": round(sim, 3), }) scored.sort(key=lambda x: x["similarity"], reverse=True) return scored[:top_k] def compute_relevance(matches: list[dict]) -> dict: """Aggregiere Relevanz-Score + Begruendung aus einer Match-Liste. Score = max(antrag.gwoe_score × similarity) ueber alle Matches. Domain: 0..10 (gleicht GWÖ-Score-Skala). Level-Schwellen: - score >= 4.0 → "high" (mind. ein starkes GWÖ-Match) - score >= 2.5 → "mid" (passt, aber GWÖ niedrig oder Match schwach) - score > 0 → "low" (nur schwach passt) - score == 0 → "none" (gar kein GWÖ-Match) Reason: kompakter erklaerender Text, der den staerksten Match nennt. Kein LLM-Call — nur Daten-Synthese. """ if not matches: return { "score": 0.0, "level": "none", "reason": "Keine GWÖ-bewerteten Anträge passen zu dieser News.", } # Score-Beitraege berechnen contribs = [] for m in matches: gw = m.get("gwoe_score") or 0.0 sim = m.get("similarity") or 0.0 contribs.append((gw * sim, m)) contribs.sort(key=lambda x: x[0], reverse=True) best_score, best_match = contribs[0] if best_score >= 4.0: level = "high" elif best_score >= 2.5: level = "mid" elif best_score > 0: level = "low" else: level = "none" # Begruendung fr = ", ".join(best_match.get("fraktionen") or []) fr_clause = f" ({fr})" if fr else "" titel = (best_match.get("title") or "").strip() if len(titel) > 70: titel = titel[:67] + "…" reason = ( f"GWÖ-{best_match.get('gwoe_score')}/10-Antrag „{titel}" + ("" if titel.endswith("…") else "") + "“" f"{fr_clause} passt mit Similarity {best_match.get('similarity')}" ) if len(matches) > 1: reason += f" — {len(matches) - 1} weitere(r) Match(es)." else: reason += "." return { "score": round(best_score, 2), "level": level, "reason": reason, } def aggregate_top_themen( days_window: int = 7, top_k: int = 10, min_similarity: float = 0.4, matches_per_news: int = 3, only_relevant: bool = False, single_date: Optional[str] = None, db_path: Optional[Path] = None, ) -> dict: """Top-K aktuelle News (letzte N Tage) mit jeweils ihren passendsten Antraegen — der primaere Dashboard-Endpoint. Returns: ``{ "buckets": [{ "news": {url, titel, summary, datum, source, ressort, tags}, "matches": [{drucksache, title, gwoe_score, similarity, ...}] }, ...], "n_total_news": int, "filter": {...} }`` """ # Cache-Key (db_path nur wenn Test-Override; sonst per Default) cache_key = ( "top_themen", days_window, top_k, round(min_similarity, 3), matches_per_news, only_relevant, single_date, str(db_path or ""), ) cached = _cache_get(cache_key) if cached is not None: return cached from .config import settings from . import embeddings as emb path = db_path or settings.db_path if not Path(path).exists(): return {"buckets": [], "n_total_news": 0, "filter": { "days_window": days_window, "top_k": top_k, "min_similarity": min_similarity, }} cutoff = ( datetime.now(timezone.utc).timestamp() - days_window * 86400 ) news_rows = _load_embeddings( Path(path), "news_articles", ["url", "titel", "summary", "datum", "source", "ressort", "tags"], ) # Nach Datum filtern fresh = [] for n in news_rows: try: news_ts = datetime.fromisoformat( n["datum"].replace("Z", "+00:00") ).timestamp() except (ValueError, AttributeError): continue # single_date hat Vorrang: nur News dieses Tages if single_date: if not n["datum"].startswith(single_date): continue elif news_ts < cutoff: continue n["_ts"] = news_ts fresh.append(n) n_in_window = len(fresh) # Nach Datum desc sortieren, top_k cutten fresh.sort(key=lambda x: x["_ts"], reverse=True) fresh = fresh[:top_k] # Pro News: alle Antraege scoren, Top matches_per_news behalten assessments = _load_embeddings( Path(path), "assessments", ["drucksache", "title", "bundesland", "fraktionen", "gwoe_score", "empfehlung", "themen", "datum"], ) buckets = [] for n in fresh: scored = [] for a in assessments: sim = emb.cosine_similarity(n["_vec"], a["_vec"]) if sim < min_similarity: continue scored.append({ "drucksache": a["drucksache"], "title": a["title"], "bundesland": a["bundesland"], "fraktionen": json.loads(a["fraktionen"] or "[]"), "gwoe_score": a["gwoe_score"], "empfehlung": a["empfehlung"], "datum": a["datum"], "similarity": round(sim, 3), }) scored.sort(key=lambda x: x["similarity"], reverse=True) try: tags = json.loads(n["tags"]) if n["tags"] else [] except (json.JSONDecodeError, TypeError): tags = [] top_matches = scored[:matches_per_news] relevance = compute_relevance(top_matches) # Pre-Filter: optional alle non-high/-mid raus if only_relevant and relevance["level"] not in ("high", "mid"): continue buckets.append({ "news": { "url": n["url"], "titel": n["titel"], "summary": n["summary"], "datum": n["datum"], "source": n["source"], "ressort": n["ressort"], "tags": tags, }, "matches": top_matches, "relevance": relevance, }) # Sortiere primaer nach Relevanz-Score (high vor mid vor low/none), # sekundaer nach Datum desc. level_rank = {"high": 3, "mid": 2, "low": 1, "none": 0} buckets.sort( key=lambda b: ( level_rank.get(b["relevance"]["level"], 0), b["relevance"]["score"], b["news"]["datum"], ), reverse=True, ) result = { "buckets": buckets, "n_total_news": len(news_rows), "n_in_window": n_in_window, "n_shown": len(buckets), "filter": { "days_window": days_window, "top_k": top_k, "min_similarity": min_similarity, "matches_per_news": matches_per_news, "only_relevant": only_relevant, "single_date": single_date, }, } _cache_set(cache_key, result) return result def aggregate_themen_zeitreihe( days_window: int = 30, db_path: Optional[Path] = None, ) -> dict: """News-Volumen pro (Tag, Source) ueber die letzten N Tage — Stacked-Area-Chart. Liefert Zeitreihe ohne Antrag-Match — nur die News-Aktivitaet pro Quelle, damit das Dashboard sehen kann, welche Quellen wie aktiv waren. """ from .config import settings path = db_path or settings.db_path if not Path(path).exists(): return {"buckets": [], "sources": [], "series": {}} cutoff_ts = datetime.now(timezone.utc).timestamp() - days_window * 86400 conn = sqlite3.connect(str(path)) try: rows = conn.execute( "SELECT datum, source FROM news_articles" ).fetchall() finally: conn.close() counts: defaultdict[tuple[str, str], int] = defaultdict(int) sources_seen: set[str] = set() days_seen: set[str] = set() for datum, source in rows: if not datum: continue try: ts = datetime.fromisoformat(datum.replace("Z", "+00:00")).timestamp() except (ValueError, AttributeError): continue if ts < cutoff_ts: continue day = datum[:10] # YYYY-MM-DD sources_seen.add(source) days_seen.add(day) counts[(day, source)] += 1 days_sorted = sorted(days_seen) sources_sorted = sorted(sources_seen) series = { s: [counts[(d, s)] for d in days_sorted] for s in sources_sorted } return { "buckets": days_sorted, "sources": sources_sorted, "series": series, } def aggregate_news_cluster( days_window: int = 7, intra_threshold: float = 0.55, antrag_threshold: float = 0.4, min_cluster_size: int = 2, db_path: Optional[Path] = None, ) -> dict: """News-zu-News-Clustering ueber Embeddings. Cached (60s TTL). Greedy: jede ungeclusterte News wird Cluster-Seed, alle anderen mit cosine >= ``intra_threshold`` werden eingeschlossen. Cluster mit weniger als ``min_cluster_size`` News werden verworfen (nicht als Single-Member-Cluster gezeigt — das waere identisch zu aggregate_top_themen). Pro Cluster: zentralster Antrag-Match aus den GWÖ-bewerteten Antraegen. """ cache_key = ( "cluster", days_window, round(intra_threshold, 3), round(antrag_threshold, 3), min_cluster_size, str(db_path or ""), ) cached = _cache_get(cache_key) if cached is not None: return cached from .config import settings from . import embeddings as emb path = db_path or settings.db_path if not Path(path).exists(): return {"clusters": [], "n_total_news": 0} cutoff = datetime.now(timezone.utc).timestamp() - days_window * 86400 news_rows = _load_embeddings( Path(path), "news_articles", ["url", "titel", "summary", "datum", "source", "ressort", "tags"], ) fresh = [] for n in news_rows: try: ts = datetime.fromisoformat(n["datum"].replace("Z", "+00:00")).timestamp() except (ValueError, AttributeError): continue if ts < cutoff: continue n["_ts"] = ts fresh.append(n) fresh.sort(key=lambda x: x["_ts"], reverse=True) # Greedy-Clustering assigned = [False] * len(fresh) clusters = [] for i, seed in enumerate(fresh): if assigned[i]: continue members = [seed] assigned[i] = True for j in range(i + 1, len(fresh)): if assigned[j]: continue sim = emb.cosine_similarity(seed["_vec"], fresh[j]["_vec"]) if sim >= intra_threshold: members.append(fresh[j]) assigned[j] = True if len(members) >= min_cluster_size: clusters.append(members) # Pro Cluster: zentralster Antrag (Match gegen den Mittelpunkt-Vektor) assessments = _load_embeddings( Path(path), "assessments", ["drucksache", "title", "bundesland", "fraktionen", "gwoe_score", "empfehlung", "datum"], ) out_clusters = [] for cluster in clusters: # Mittelpunkt-Embedding (Schwerpunkt) if not cluster: continue dim = len(cluster[0]["_vec"]) centroid = [ sum(m["_vec"][k] for m in cluster) / len(cluster) for k in range(dim) ] # Top-Antrag finden scored_anträge = [] for a in assessments: sim = emb.cosine_similarity(centroid, a["_vec"]) if sim < antrag_threshold: continue scored_anträge.append({ "drucksache": a["drucksache"], "title": a["title"], "bundesland": a["bundesland"], "fraktionen": json.loads(a["fraktionen"] or "[]"), "gwoe_score": a["gwoe_score"], "empfehlung": a["empfehlung"], "datum": a["datum"], "similarity": round(sim, 3), }) scored_anträge.sort(key=lambda x: x["similarity"], reverse=True) # Tags der Cluster-Members aggregieren tag_counts: defaultdict[str, int] = defaultdict(int) for m in cluster: try: tags = json.loads(m["tags"]) if m["tags"] else [] except (json.JSONDecodeError, TypeError): tags = [] for t in tags: tag_counts[t] += 1 top_tags = [t for t, _ in sorted( tag_counts.items(), key=lambda x: x[1], reverse=True, )[:5]] out_clusters.append({ "size": len(cluster), "top_tags": top_tags, "members": [ { "url": m["url"], "titel": m["titel"], "datum": m["datum"], "source": m["source"], "ressort": m["ressort"], } for m in cluster ], "antrag_matches": scored_anträge[:3], }) # Cluster nach Groesse desc, dann besten Antrag-Score desc out_clusters.sort( key=lambda c: ( c["size"], c["antrag_matches"][0]["similarity"] if c["antrag_matches"] else 0, ), reverse=True, ) result = { "clusters": out_clusters, "n_total_news": len(fresh), "filter": { "days_window": days_window, "intra_threshold": intra_threshold, "antrag_threshold": antrag_threshold, "min_cluster_size": min_cluster_size, }, } _cache_set(cache_key, result) return result def aggregate_top_antraege_with_news( min_gwoe_score: float = 8.0, days_window: int = 14, min_similarity: float = 0.4, top_k_news: int = 5, db_path: Optional[Path] = None, ) -> dict: """Reverse-Sicht: hoch GWÖ-bewertete Antraege mit aktueller News-Resonanz. Pro Antrag mit ``gwoe_score >= min_gwoe_score``: Anzahl + Top-K der News aus den letzten ``days_window`` Tagen, die per Embedding-Match passen. Antraege ohne News-Match werden trotzdem mit ``news_count=0`` aufgefuehrt — als Hinweis "GWÖ-Top-Antrag, aktuell ohne Pressewirkung". """ from .config import settings from . import embeddings as emb path = db_path or settings.db_path if not Path(path).exists(): return {"antraege": []} cutoff = datetime.now(timezone.utc).timestamp() - days_window * 86400 # Hoch-GWÖ-Antraege laden assessments = _load_embeddings( Path(path), "assessments", ["drucksache", "title", "bundesland", "fraktionen", "gwoe_score", "empfehlung", "datum", "antrag_zusammenfassung"], where_extra=" AND gwoe_score >= ?", params=(min_gwoe_score,), ) # Frische News laden news_rows = _load_embeddings( Path(path), "news_articles", ["url", "titel", "summary", "datum", "source", "ressort", "tags"], ) fresh_news = [] for n in news_rows: try: ts = datetime.fromisoformat(n["datum"].replace("Z", "+00:00")).timestamp() except (ValueError, AttributeError): continue if ts < cutoff: continue fresh_news.append(n) out = [] for a in assessments: scored = [] for n in fresh_news: sim = emb.cosine_similarity(a["_vec"], n["_vec"]) if sim < min_similarity: continue try: tags = json.loads(n["tags"]) if n["tags"] else [] except (json.JSONDecodeError, TypeError): tags = [] scored.append({ "url": n["url"], "titel": n["titel"], "summary": n["summary"], "datum": n["datum"], "source": n["source"], "ressort": n["ressort"], "tags": tags, "similarity": round(sim, 3), }) scored.sort(key=lambda x: x["similarity"], reverse=True) out.append({ "drucksache": a["drucksache"], "title": a["title"], "bundesland": a["bundesland"], "fraktionen": json.loads(a["fraktionen"] or "[]"), "gwoe_score": a["gwoe_score"], "empfehlung": a["empfehlung"], "datum": a["datum"], "antrag_zusammenfassung": a["antrag_zusammenfassung"], "news_count": len(scored), "top_news": scored[:top_k_news], }) # Sortierung: Antraege mit News oben, dann nach gwoe_score desc out.sort( key=lambda x: (x["news_count"] > 0, x["news_count"], x["gwoe_score"] or 0), reverse=True, ) return { "antraege": out, "filter": { "min_gwoe_score": min_gwoe_score, "days_window": days_window, "min_similarity": min_similarity, "top_k_news": top_k_news, }, }