gwoe-antragspruefer/app/themen_matching.py
Dotty Dotter 7c1e0fa0b0 feat(#170): Chart-Click-Tag-Filter + Transparenz-Banner + top_k 50 default
User-Feedback: "Welche Meldungen werden da angezeigt? Es wurden ja viel
mehr indiziert."

**1. Transparenz-Banner im News-Tab**

Zeigt jetzt explizit:
- "X News angezeigt"
- "Y News im Zeitraum (mit Embedding)"
- "Z News insgesamt embedded"
- Hinweis wenn only_relevant aktiv ist
- Hinweis wenn top_k limitierend ist

**2. Chart als Filter** — Klick auf einen Tag im News-Volumen-Chart
wechselt zum News-Tab und filtert auf diesen Tag.

- Chart bekommt onClick-Handler ueber getElementsAtEventForMode
- Cursor wechselt bei Hover ueber Datenpunkte
- Im News-Tab erscheint Pill "Tag: 2026-05-01 [× Tag-Filter entfernen]"

**3. Backend `single_date`-Param**

`aggregate_top_themen(single_date="YYYY-MM-DD")` filtert auf genau
diesen Tag (overrides days_window). Endpoint: `/api/aktuelle-themen/top
?date=YYYY-MM-DD`. Response neu: `n_in_window`, `n_shown`,
`filter.single_date`.

**4. Default top_k 20 → 50** (max 200), damit weniger oft auf
"top_k limitierend" gestoßen wird.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 21:24:38 +02:00

702 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Themen × Anträge Matching fuer das Aktuelle-Themen-Dashboard
(#170 Phase 2).
Verschneidet News-Artikel-Embeddings (aus news_articles.summary_embedding)
mit Antrag-Embeddings (assessments.summary_embedding) per Cosine-Similarity.
Liefert pro News-Artikel die Top-K-passendsten Anträge.
Reuse:
- ``embeddings.cosine_similarity`` fuer den Vektor-Vergleich
- Beide Tabellen nutzen denselben Embedding-Modell-Vektorraum (qwen v4),
daher direkter Cross-Vergleich moeglich
- Filter ueber ``embedding_model``-Spalte, falls Migration laueft
"""
from __future__ import annotations
import json
import logging
import sqlite3
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
def _load_embeddings(
db_path: Path,
table: str,
select_cols: list[str],
where_extra: str = "",
params: tuple = (),
) -> list[dict]:
"""Generischer Loader fuer Tabellen mit ``summary_embedding``-Spalte.
Liefert Zeilen mit decoded Embedding-Vektor (oder filtert aus, wenn
Modell nicht zum aktuellen READ-Modell passt).
"""
from . import embeddings as emb
if not Path(db_path).exists():
return []
conn = sqlite3.connect(str(db_path))
try:
conn.row_factory = sqlite3.Row
cols = ", ".join(select_cols)
sql = (
f"SELECT {cols}, summary_embedding, embedding_model "
f"FROM {table} "
f"WHERE summary_embedding IS NOT NULL {where_extra}"
)
rows = conn.execute(sql, params).fetchall()
finally:
conn.close()
out = []
for r in rows:
if r["embedding_model"] != emb.EMBEDDING_MODEL_READ:
continue
try:
vec = json.loads(r["summary_embedding"])
except (json.JSONDecodeError, TypeError):
continue
d = dict(r)
d["_vec"] = vec
out.append(d)
return out
def find_anträge_for_news(
news_url: str,
top_k: int = 5,
min_similarity: float = 0.4,
db_path: Optional[Path] = None,
) -> list[dict]:
"""Pro gegebener News-URL: Top-K aehnlichste Antraege per Cosine-Match.
Filter ``min_similarity`` haelt den Cut-Off fuer "passt einigermassen".
0.4 ist empirisch der Punkt, ab dem qwen-v4-Embeddings semantisch
relevant matchen.
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return []
# 1. News-Vektor laden
conn = sqlite3.connect(str(path))
try:
row = conn.execute(
"""SELECT summary_embedding, embedding_model
FROM news_articles WHERE url=?""",
(news_url,),
).fetchone()
finally:
conn.close()
if not row or not row[0] or row[1] != emb.EMBEDDING_MODEL_READ:
return []
try:
news_vec = json.loads(row[0])
except (json.JSONDecodeError, TypeError):
return []
# 2. Alle Assessments mit Embedding laden + scoren
assessments = _load_embeddings(
Path(path),
"assessments",
["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
"empfehlung", "themen", "datum"],
)
scored = []
for a in assessments:
sim = emb.cosine_similarity(news_vec, a["_vec"])
if sim < min_similarity:
continue
scored.append({
"drucksache": a["drucksache"],
"title": a["title"],
"bundesland": a["bundesland"],
"fraktionen": json.loads(a["fraktionen"] or "[]"),
"gwoe_score": a["gwoe_score"],
"empfehlung": a["empfehlung"],
"themen": json.loads(a["themen"] or "[]"),
"datum": a["datum"],
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
return scored[:top_k]
def find_news_for_antrag(
drucksache: str,
top_k: int = 5,
min_similarity: float = 0.4,
days_window: int = 90,
db_path: Optional[Path] = None,
) -> list[dict]:
"""Pro gegebener Drucksache: Top-K aehnlichste News-Artikel per Cosine.
Filtert News auf ein Zeitfenster (Default 90 Tage), damit
Pressemitteilungen aus aktueller Aktualitaet stammen.
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return []
# 1. Antrag-Vektor laden
conn = sqlite3.connect(str(path))
try:
row = conn.execute(
"""SELECT summary_embedding, embedding_model
FROM assessments WHERE drucksache=?""",
(drucksache,),
).fetchone()
finally:
conn.close()
if not row or not row[0] or row[1] != emb.EMBEDDING_MODEL_READ:
return []
try:
antrag_vec = json.loads(row[0])
except (json.JSONDecodeError, TypeError):
return []
# 2. News mit Datums-Filter laden
cutoff = datetime.now(timezone.utc).timestamp() - days_window * 86400
news = _load_embeddings(
Path(path),
"news_articles",
["url", "titel", "summary", "datum", "source", "ressort", "tags"],
)
scored = []
for n in news:
sim = emb.cosine_similarity(antrag_vec, n["_vec"])
if sim < min_similarity:
continue
# Datums-Filter
try:
news_ts = datetime.fromisoformat(
n["datum"].replace("Z", "+00:00")
).timestamp()
if news_ts < cutoff:
continue
except (ValueError, AttributeError):
pass # Wenn Datum nicht parsbar, lass es durch
try:
tags = json.loads(n["tags"]) if n["tags"] else []
except (json.JSONDecodeError, TypeError):
tags = []
scored.append({
"url": n["url"],
"titel": n["titel"],
"summary": n["summary"],
"datum": n["datum"],
"source": n["source"],
"ressort": n["ressort"],
"tags": tags,
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
return scored[:top_k]
def compute_relevance(matches: list[dict]) -> dict:
"""Aggregiere Relevanz-Score + Begruendung aus einer Match-Liste.
Score = max(antrag.gwoe_score × similarity) ueber alle Matches.
Domain: 0..10 (gleicht GWÖ-Score-Skala). Level-Schwellen:
- score >= 4.0 → "high" (mind. ein starkes GWÖ-Match)
- score >= 2.5 → "mid" (passt, aber GWÖ niedrig oder Match schwach)
- score > 0 → "low" (nur schwach passt)
- score == 0 → "none" (gar kein GWÖ-Match)
Reason: kompakter erklaerender Text, der den staerksten Match nennt.
Kein LLM-Call — nur Daten-Synthese.
"""
if not matches:
return {
"score": 0.0,
"level": "none",
"reason": "Keine GWÖ-bewerteten Anträge passen zu dieser News.",
}
# Score-Beitraege berechnen
contribs = []
for m in matches:
gw = m.get("gwoe_score") or 0.0
sim = m.get("similarity") or 0.0
contribs.append((gw * sim, m))
contribs.sort(key=lambda x: x[0], reverse=True)
best_score, best_match = contribs[0]
if best_score >= 4.0:
level = "high"
elif best_score >= 2.5:
level = "mid"
elif best_score > 0:
level = "low"
else:
level = "none"
# Begruendung
fr = ", ".join(best_match.get("fraktionen") or [])
fr_clause = f" ({fr})" if fr else ""
titel = (best_match.get("title") or "").strip()
if len(titel) > 70:
titel = titel[:67] + ""
reason = (
f"GWÖ-{best_match.get('gwoe_score')}/10-Antrag „{titel}" + ("" if titel.endswith("") else "") + ""
f"{fr_clause} passt mit Similarity {best_match.get('similarity')}"
)
if len(matches) > 1:
reason += f"{len(matches) - 1} weitere(r) Match(es)."
else:
reason += "."
return {
"score": round(best_score, 2),
"level": level,
"reason": reason,
}
def aggregate_top_themen(
days_window: int = 7,
top_k: int = 10,
min_similarity: float = 0.4,
matches_per_news: int = 3,
only_relevant: bool = False,
single_date: Optional[str] = None,
db_path: Optional[Path] = None,
) -> dict:
"""Top-K aktuelle News (letzte N Tage) mit jeweils ihren passendsten
Antraegen — der primaere Dashboard-Endpoint.
Returns:
``{
"buckets": [{
"news": {url, titel, summary, datum, source, ressort, tags},
"matches": [{drucksache, title, gwoe_score, similarity, ...}]
}, ...],
"n_total_news": int,
"filter": {...}
}``
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return {"buckets": [], "n_total_news": 0, "filter": {
"days_window": days_window, "top_k": top_k,
"min_similarity": min_similarity,
}}
cutoff = (
datetime.now(timezone.utc).timestamp() - days_window * 86400
)
news_rows = _load_embeddings(
Path(path),
"news_articles",
["url", "titel", "summary", "datum", "source", "ressort", "tags"],
)
# Nach Datum filtern
fresh = []
for n in news_rows:
try:
news_ts = datetime.fromisoformat(
n["datum"].replace("Z", "+00:00")
).timestamp()
except (ValueError, AttributeError):
continue
# single_date hat Vorrang: nur News dieses Tages
if single_date:
if not n["datum"].startswith(single_date):
continue
elif news_ts < cutoff:
continue
n["_ts"] = news_ts
fresh.append(n)
n_in_window = len(fresh)
# Nach Datum desc sortieren, top_k cutten
fresh.sort(key=lambda x: x["_ts"], reverse=True)
fresh = fresh[:top_k]
# Pro News: alle Antraege scoren, Top matches_per_news behalten
assessments = _load_embeddings(
Path(path),
"assessments",
["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
"empfehlung", "themen", "datum"],
)
buckets = []
for n in fresh:
scored = []
for a in assessments:
sim = emb.cosine_similarity(n["_vec"], a["_vec"])
if sim < min_similarity:
continue
scored.append({
"drucksache": a["drucksache"],
"title": a["title"],
"bundesland": a["bundesland"],
"fraktionen": json.loads(a["fraktionen"] or "[]"),
"gwoe_score": a["gwoe_score"],
"empfehlung": a["empfehlung"],
"datum": a["datum"],
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
try:
tags = json.loads(n["tags"]) if n["tags"] else []
except (json.JSONDecodeError, TypeError):
tags = []
top_matches = scored[:matches_per_news]
relevance = compute_relevance(top_matches)
# Pre-Filter: optional alle non-high/-mid raus
if only_relevant and relevance["level"] not in ("high", "mid"):
continue
buckets.append({
"news": {
"url": n["url"],
"titel": n["titel"],
"summary": n["summary"],
"datum": n["datum"],
"source": n["source"],
"ressort": n["ressort"],
"tags": tags,
},
"matches": top_matches,
"relevance": relevance,
})
# Sortiere primaer nach Relevanz-Score (high vor mid vor low/none),
# sekundaer nach Datum desc.
level_rank = {"high": 3, "mid": 2, "low": 1, "none": 0}
buckets.sort(
key=lambda b: (
level_rank.get(b["relevance"]["level"], 0),
b["relevance"]["score"],
b["news"]["datum"],
),
reverse=True,
)
return {
"buckets": buckets,
"n_total_news": len(news_rows),
"n_in_window": n_in_window,
"n_shown": len(buckets),
"filter": {
"days_window": days_window,
"top_k": top_k,
"min_similarity": min_similarity,
"matches_per_news": matches_per_news,
"only_relevant": only_relevant,
"single_date": single_date,
},
}
def aggregate_themen_zeitreihe(
days_window: int = 30,
db_path: Optional[Path] = None,
) -> dict:
"""News-Volumen pro (Tag, Source) ueber die letzten N Tage —
Stacked-Area-Chart.
Liefert Zeitreihe ohne Antrag-Match — nur die News-Aktivitaet pro
Quelle, damit das Dashboard sehen kann, welche Quellen wie aktiv waren.
"""
from .config import settings
path = db_path or settings.db_path
if not Path(path).exists():
return {"buckets": [], "sources": [], "series": {}}
cutoff_ts = datetime.now(timezone.utc).timestamp() - days_window * 86400
conn = sqlite3.connect(str(path))
try:
rows = conn.execute(
"SELECT datum, source FROM news_articles"
).fetchall()
finally:
conn.close()
counts: defaultdict[tuple[str, str], int] = defaultdict(int)
sources_seen: set[str] = set()
days_seen: set[str] = set()
for datum, source in rows:
if not datum:
continue
try:
ts = datetime.fromisoformat(datum.replace("Z", "+00:00")).timestamp()
except (ValueError, AttributeError):
continue
if ts < cutoff_ts:
continue
day = datum[:10] # YYYY-MM-DD
sources_seen.add(source)
days_seen.add(day)
counts[(day, source)] += 1
days_sorted = sorted(days_seen)
sources_sorted = sorted(sources_seen)
series = {
s: [counts[(d, s)] for d in days_sorted]
for s in sources_sorted
}
return {
"buckets": days_sorted,
"sources": sources_sorted,
"series": series,
}
def aggregate_news_cluster(
days_window: int = 7,
intra_threshold: float = 0.55,
antrag_threshold: float = 0.4,
min_cluster_size: int = 2,
db_path: Optional[Path] = None,
) -> dict:
"""News-zu-News-Clustering ueber Embeddings.
Greedy: jede ungeclusterte News wird Cluster-Seed, alle anderen mit
cosine >= ``intra_threshold`` werden eingeschlossen. Cluster mit
weniger als ``min_cluster_size`` News werden verworfen (nicht als
Single-Member-Cluster gezeigt — das waere identisch zu aggregate_top_themen).
Pro Cluster: zentralster Antrag-Match aus den GWÖ-bewerteten Antraegen.
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return {"clusters": [], "n_total_news": 0}
cutoff = datetime.now(timezone.utc).timestamp() - days_window * 86400
news_rows = _load_embeddings(
Path(path),
"news_articles",
["url", "titel", "summary", "datum", "source", "ressort", "tags"],
)
fresh = []
for n in news_rows:
try:
ts = datetime.fromisoformat(n["datum"].replace("Z", "+00:00")).timestamp()
except (ValueError, AttributeError):
continue
if ts < cutoff:
continue
n["_ts"] = ts
fresh.append(n)
fresh.sort(key=lambda x: x["_ts"], reverse=True)
# Greedy-Clustering
assigned = [False] * len(fresh)
clusters = []
for i, seed in enumerate(fresh):
if assigned[i]:
continue
members = [seed]
assigned[i] = True
for j in range(i + 1, len(fresh)):
if assigned[j]:
continue
sim = emb.cosine_similarity(seed["_vec"], fresh[j]["_vec"])
if sim >= intra_threshold:
members.append(fresh[j])
assigned[j] = True
if len(members) >= min_cluster_size:
clusters.append(members)
# Pro Cluster: zentralster Antrag (Match gegen den Mittelpunkt-Vektor)
assessments = _load_embeddings(
Path(path),
"assessments",
["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
"empfehlung", "datum"],
)
out_clusters = []
for cluster in clusters:
# Mittelpunkt-Embedding (Schwerpunkt)
if not cluster:
continue
dim = len(cluster[0]["_vec"])
centroid = [
sum(m["_vec"][k] for m in cluster) / len(cluster)
for k in range(dim)
]
# Top-Antrag finden
scored_anträge = []
for a in assessments:
sim = emb.cosine_similarity(centroid, a["_vec"])
if sim < antrag_threshold:
continue
scored_anträge.append({
"drucksache": a["drucksache"],
"title": a["title"],
"bundesland": a["bundesland"],
"fraktionen": json.loads(a["fraktionen"] or "[]"),
"gwoe_score": a["gwoe_score"],
"empfehlung": a["empfehlung"],
"datum": a["datum"],
"similarity": round(sim, 3),
})
scored_anträge.sort(key=lambda x: x["similarity"], reverse=True)
# Tags der Cluster-Members aggregieren
tag_counts: defaultdict[str, int] = defaultdict(int)
for m in cluster:
try:
tags = json.loads(m["tags"]) if m["tags"] else []
except (json.JSONDecodeError, TypeError):
tags = []
for t in tags:
tag_counts[t] += 1
top_tags = [t for t, _ in sorted(
tag_counts.items(), key=lambda x: x[1], reverse=True,
)[:5]]
out_clusters.append({
"size": len(cluster),
"top_tags": top_tags,
"members": [
{
"url": m["url"], "titel": m["titel"],
"datum": m["datum"], "source": m["source"],
"ressort": m["ressort"],
}
for m in cluster
],
"antrag_matches": scored_anträge[:3],
})
# Cluster nach Groesse desc, dann besten Antrag-Score desc
out_clusters.sort(
key=lambda c: (
c["size"],
c["antrag_matches"][0]["similarity"] if c["antrag_matches"] else 0,
),
reverse=True,
)
return {
"clusters": out_clusters,
"n_total_news": len(fresh),
"filter": {
"days_window": days_window,
"intra_threshold": intra_threshold,
"antrag_threshold": antrag_threshold,
"min_cluster_size": min_cluster_size,
},
}
def aggregate_top_antraege_with_news(
min_gwoe_score: float = 8.0,
days_window: int = 14,
min_similarity: float = 0.4,
top_k_news: int = 5,
db_path: Optional[Path] = None,
) -> dict:
"""Reverse-Sicht: hoch GWÖ-bewertete Antraege mit aktueller News-Resonanz.
Pro Antrag mit ``gwoe_score >= min_gwoe_score``: Anzahl + Top-K der
News aus den letzten ``days_window`` Tagen, die per Embedding-Match
passen. Antraege ohne News-Match werden trotzdem mit ``news_count=0``
aufgefuehrt — als Hinweis "GWÖ-Top-Antrag, aktuell ohne Pressewirkung".
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return {"antraege": []}
cutoff = datetime.now(timezone.utc).timestamp() - days_window * 86400
# Hoch-GWÖ-Antraege laden
assessments = _load_embeddings(
Path(path),
"assessments",
["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
"empfehlung", "datum", "antrag_zusammenfassung"],
where_extra=" AND gwoe_score >= ?",
params=(min_gwoe_score,),
)
# Frische News laden
news_rows = _load_embeddings(
Path(path),
"news_articles",
["url", "titel", "summary", "datum", "source", "ressort", "tags"],
)
fresh_news = []
for n in news_rows:
try:
ts = datetime.fromisoformat(n["datum"].replace("Z", "+00:00")).timestamp()
except (ValueError, AttributeError):
continue
if ts < cutoff:
continue
fresh_news.append(n)
out = []
for a in assessments:
scored = []
for n in fresh_news:
sim = emb.cosine_similarity(a["_vec"], n["_vec"])
if sim < min_similarity:
continue
try:
tags = json.loads(n["tags"]) if n["tags"] else []
except (json.JSONDecodeError, TypeError):
tags = []
scored.append({
"url": n["url"], "titel": n["titel"],
"summary": n["summary"], "datum": n["datum"],
"source": n["source"], "ressort": n["ressort"],
"tags": tags,
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
out.append({
"drucksache": a["drucksache"],
"title": a["title"],
"bundesland": a["bundesland"],
"fraktionen": json.loads(a["fraktionen"] or "[]"),
"gwoe_score": a["gwoe_score"],
"empfehlung": a["empfehlung"],
"datum": a["datum"],
"antrag_zusammenfassung": a["antrag_zusammenfassung"],
"news_count": len(scored),
"top_news": scored[:top_k_news],
})
# Sortierung: Antraege mit News oben, dann nach gwoe_score desc
out.sort(
key=lambda x: (x["news_count"] > 0, x["news_count"], x["gwoe_score"] or 0),
reverse=True,
)
return {
"antraege": out,
"filter": {
"min_gwoe_score": min_gwoe_score,
"days_window": days_window,
"min_similarity": min_similarity,
"top_k_news": top_k_news,
},
}