gwoe-antragspruefer/app/themen_matching.py
Dotty Dotter e27dfc30a2 feat(#170 followup 2): Pre-Filter, Cluster, Antrags-Initiative, PM-Versionierung, Mail-Link
User-Feedback: Aktuelle-Themen-Dashboard war "Detective-Modus" — durch
viele News scrollen, Match-Stärke selbst interpretieren. Komplett-Refactor
zur kuratierten Sicht mit Tabs.

**1. Pre-Filter + GWÖ-Relevanz-Score (#134)**

`compute_relevance(matches)`: Score = max(antrag.gwoe_score × similarity).
Level: high (≥4.0) / mid (≥2.5) / low (>0) / none.
Pro News in der UI ein farbiger Pill (gruen/orange/grau) + Reason-Text:
"GWÖ-9.0/10-Antrag „Klimaschutzgesetz" (GRÜNE) passt mit Similarity 0.55."

Default-Filter "Nur GWÖ-relevant" aktiv (only_relevant=true) — zeigt
nur high/mid News, blendet Rauschen aus. Toggle-Checkbox.

`/api/aktuelle-themen/top` neuer Param `only_relevant=true|false`.

**2. PM-Versionierung im Modal (#135)**

`list_drafts_for(drucksache, news_url)`: alle Versionen, neueste oben.
Endpoint `/api/aktuelle-themen/drafts-versions`. Modal zeigt Dropdown
wenn >1 Version, Switch ohne LLM-Call. Force-Regen bleibt als Button
im "bestehender Entwurf"-Banner.

**3. News-Cluster-View (#136)**

`aggregate_news_cluster(intra_threshold=0.55, min_cluster_size=2)`:
Greedy-Embedding-Cluster + zentralster Antrags-Match per Centroid-
Vektor. Zweiter Tab "Themen-Cluster": 5 News über "Pflege" → 1 Cluster
mit gemeinsamem Antrag-Vorschlag, statt 5 separate Cards.
Endpoint: `/api/aktuelle-themen/cluster`.

**4. Mail-Direkt-Link + Clipboard (#137)**

Im PM-Modal zwei Buttons:
- "📧 Per Mail versenden" (mailto: mit subject + body, ~1900 Char Limit)
- "📋 In Zwischenablage kopieren" (navigator.clipboard.writeText)
- Bei langem PM (>1900 Char): mailto-Link wird ausgegraut, Hinweis
  "PM zu lang für Mail-Link — Clipboard nutzen"

**5. Antrags-Initiative (#138)**

`aggregate_top_antraege_with_news(min_gwoe_score=8.0, days=14)`:
Reverse-Sicht — pro Antrag mit GWÖ ≥ 8 die News-Resonanz. Antraege
ohne Match werden trotzdem angezeigt mit "keine News"-Pill.
Dritter Tab "GWÖ-Top-Anträge". Endpoint `.../top-antraege`.

**UI-Restrukturierung:** statt einer langen Scroll-Liste jetzt
5 Tabs mit gemeinsamer Filter-Bar:
- News × Anträge (Default, kuratiert via Pre-Filter)
- Themen-Cluster (Bündel ähnlicher News)
- GWÖ-Top-Anträge (Reverse)
- News-Volumen (Chart)
- PM-Entwürfe (Drafts-Liste)

Default min_similarity 0.40 → 0.50 erhoeht (weniger Rauschen).

Tests: 14 neue (compute_relevance × 5, only_relevant + sort × 3,
cluster × 3, top_antraege × 3). Suite 1067 gruen.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 13:41:31 +02:00

691 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Themen × Anträge Matching fuer das Aktuelle-Themen-Dashboard
(#170 Phase 2).
Verschneidet News-Artikel-Embeddings (aus news_articles.summary_embedding)
mit Antrag-Embeddings (assessments.summary_embedding) per Cosine-Similarity.
Liefert pro News-Artikel die Top-K-passendsten Anträge.
Reuse:
- ``embeddings.cosine_similarity`` fuer den Vektor-Vergleich
- Beide Tabellen nutzen denselben Embedding-Modell-Vektorraum (qwen v4),
daher direkter Cross-Vergleich moeglich
- Filter ueber ``embedding_model``-Spalte, falls Migration laueft
"""
from __future__ import annotations
import json
import logging
import sqlite3
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
def _load_embeddings(
db_path: Path,
table: str,
select_cols: list[str],
where_extra: str = "",
params: tuple = (),
) -> list[dict]:
"""Generischer Loader fuer Tabellen mit ``summary_embedding``-Spalte.
Liefert Zeilen mit decoded Embedding-Vektor (oder filtert aus, wenn
Modell nicht zum aktuellen READ-Modell passt).
"""
from . import embeddings as emb
if not Path(db_path).exists():
return []
conn = sqlite3.connect(str(db_path))
try:
conn.row_factory = sqlite3.Row
cols = ", ".join(select_cols)
sql = (
f"SELECT {cols}, summary_embedding, embedding_model "
f"FROM {table} "
f"WHERE summary_embedding IS NOT NULL {where_extra}"
)
rows = conn.execute(sql, params).fetchall()
finally:
conn.close()
out = []
for r in rows:
if r["embedding_model"] != emb.EMBEDDING_MODEL_READ:
continue
try:
vec = json.loads(r["summary_embedding"])
except (json.JSONDecodeError, TypeError):
continue
d = dict(r)
d["_vec"] = vec
out.append(d)
return out
def find_anträge_for_news(
news_url: str,
top_k: int = 5,
min_similarity: float = 0.4,
db_path: Optional[Path] = None,
) -> list[dict]:
"""Pro gegebener News-URL: Top-K aehnlichste Antraege per Cosine-Match.
Filter ``min_similarity`` haelt den Cut-Off fuer "passt einigermassen".
0.4 ist empirisch der Punkt, ab dem qwen-v4-Embeddings semantisch
relevant matchen.
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return []
# 1. News-Vektor laden
conn = sqlite3.connect(str(path))
try:
row = conn.execute(
"""SELECT summary_embedding, embedding_model
FROM news_articles WHERE url=?""",
(news_url,),
).fetchone()
finally:
conn.close()
if not row or not row[0] or row[1] != emb.EMBEDDING_MODEL_READ:
return []
try:
news_vec = json.loads(row[0])
except (json.JSONDecodeError, TypeError):
return []
# 2. Alle Assessments mit Embedding laden + scoren
assessments = _load_embeddings(
Path(path),
"assessments",
["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
"empfehlung", "themen", "datum"],
)
scored = []
for a in assessments:
sim = emb.cosine_similarity(news_vec, a["_vec"])
if sim < min_similarity:
continue
scored.append({
"drucksache": a["drucksache"],
"title": a["title"],
"bundesland": a["bundesland"],
"fraktionen": json.loads(a["fraktionen"] or "[]"),
"gwoe_score": a["gwoe_score"],
"empfehlung": a["empfehlung"],
"themen": json.loads(a["themen"] or "[]"),
"datum": a["datum"],
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
return scored[:top_k]
def find_news_for_antrag(
drucksache: str,
top_k: int = 5,
min_similarity: float = 0.4,
days_window: int = 90,
db_path: Optional[Path] = None,
) -> list[dict]:
"""Pro gegebener Drucksache: Top-K aehnlichste News-Artikel per Cosine.
Filtert News auf ein Zeitfenster (Default 90 Tage), damit
Pressemitteilungen aus aktueller Aktualitaet stammen.
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return []
# 1. Antrag-Vektor laden
conn = sqlite3.connect(str(path))
try:
row = conn.execute(
"""SELECT summary_embedding, embedding_model
FROM assessments WHERE drucksache=?""",
(drucksache,),
).fetchone()
finally:
conn.close()
if not row or not row[0] or row[1] != emb.EMBEDDING_MODEL_READ:
return []
try:
antrag_vec = json.loads(row[0])
except (json.JSONDecodeError, TypeError):
return []
# 2. News mit Datums-Filter laden
cutoff = datetime.now(timezone.utc).timestamp() - days_window * 86400
news = _load_embeddings(
Path(path),
"news_articles",
["url", "titel", "summary", "datum", "source", "ressort", "tags"],
)
scored = []
for n in news:
sim = emb.cosine_similarity(antrag_vec, n["_vec"])
if sim < min_similarity:
continue
# Datums-Filter
try:
news_ts = datetime.fromisoformat(
n["datum"].replace("Z", "+00:00")
).timestamp()
if news_ts < cutoff:
continue
except (ValueError, AttributeError):
pass # Wenn Datum nicht parsbar, lass es durch
try:
tags = json.loads(n["tags"]) if n["tags"] else []
except (json.JSONDecodeError, TypeError):
tags = []
scored.append({
"url": n["url"],
"titel": n["titel"],
"summary": n["summary"],
"datum": n["datum"],
"source": n["source"],
"ressort": n["ressort"],
"tags": tags,
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
return scored[:top_k]
def compute_relevance(matches: list[dict]) -> dict:
"""Aggregiere Relevanz-Score + Begruendung aus einer Match-Liste.
Score = max(antrag.gwoe_score × similarity) ueber alle Matches.
Domain: 0..10 (gleicht GWÖ-Score-Skala). Level-Schwellen:
- score >= 4.0 → "high" (mind. ein starkes GWÖ-Match)
- score >= 2.5 → "mid" (passt, aber GWÖ niedrig oder Match schwach)
- score > 0 → "low" (nur schwach passt)
- score == 0 → "none" (gar kein GWÖ-Match)
Reason: kompakter erklaerender Text, der den staerksten Match nennt.
Kein LLM-Call — nur Daten-Synthese.
"""
if not matches:
return {
"score": 0.0,
"level": "none",
"reason": "Keine GWÖ-bewerteten Anträge passen zu dieser News.",
}
# Score-Beitraege berechnen
contribs = []
for m in matches:
gw = m.get("gwoe_score") or 0.0
sim = m.get("similarity") or 0.0
contribs.append((gw * sim, m))
contribs.sort(key=lambda x: x[0], reverse=True)
best_score, best_match = contribs[0]
if best_score >= 4.0:
level = "high"
elif best_score >= 2.5:
level = "mid"
elif best_score > 0:
level = "low"
else:
level = "none"
# Begruendung
fr = ", ".join(best_match.get("fraktionen") or [])
fr_clause = f" ({fr})" if fr else ""
titel = (best_match.get("title") or "").strip()
if len(titel) > 70:
titel = titel[:67] + ""
reason = (
f"GWÖ-{best_match.get('gwoe_score')}/10-Antrag „{titel}" + ("" if titel.endswith("") else "") + ""
f"{fr_clause} passt mit Similarity {best_match.get('similarity')}"
)
if len(matches) > 1:
reason += f"{len(matches) - 1} weitere(r) Match(es)."
else:
reason += "."
return {
"score": round(best_score, 2),
"level": level,
"reason": reason,
}
def aggregate_top_themen(
days_window: int = 7,
top_k: int = 10,
min_similarity: float = 0.4,
matches_per_news: int = 3,
only_relevant: bool = False,
db_path: Optional[Path] = None,
) -> dict:
"""Top-K aktuelle News (letzte N Tage) mit jeweils ihren passendsten
Antraegen — der primaere Dashboard-Endpoint.
Returns:
``{
"buckets": [{
"news": {url, titel, summary, datum, source, ressort, tags},
"matches": [{drucksache, title, gwoe_score, similarity, ...}]
}, ...],
"n_total_news": int,
"filter": {...}
}``
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return {"buckets": [], "n_total_news": 0, "filter": {
"days_window": days_window, "top_k": top_k,
"min_similarity": min_similarity,
}}
cutoff = (
datetime.now(timezone.utc).timestamp() - days_window * 86400
)
news_rows = _load_embeddings(
Path(path),
"news_articles",
["url", "titel", "summary", "datum", "source", "ressort", "tags"],
)
# Nach Datum filtern
fresh = []
for n in news_rows:
try:
news_ts = datetime.fromisoformat(
n["datum"].replace("Z", "+00:00")
).timestamp()
except (ValueError, AttributeError):
continue
if news_ts < cutoff:
continue
n["_ts"] = news_ts
fresh.append(n)
# Nach Datum desc sortieren, top_k cutten
fresh.sort(key=lambda x: x["_ts"], reverse=True)
fresh = fresh[:top_k]
# Pro News: alle Antraege scoren, Top matches_per_news behalten
assessments = _load_embeddings(
Path(path),
"assessments",
["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
"empfehlung", "themen", "datum"],
)
buckets = []
for n in fresh:
scored = []
for a in assessments:
sim = emb.cosine_similarity(n["_vec"], a["_vec"])
if sim < min_similarity:
continue
scored.append({
"drucksache": a["drucksache"],
"title": a["title"],
"bundesland": a["bundesland"],
"fraktionen": json.loads(a["fraktionen"] or "[]"),
"gwoe_score": a["gwoe_score"],
"empfehlung": a["empfehlung"],
"datum": a["datum"],
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
try:
tags = json.loads(n["tags"]) if n["tags"] else []
except (json.JSONDecodeError, TypeError):
tags = []
top_matches = scored[:matches_per_news]
relevance = compute_relevance(top_matches)
# Pre-Filter: optional alle non-high/-mid raus
if only_relevant and relevance["level"] not in ("high", "mid"):
continue
buckets.append({
"news": {
"url": n["url"],
"titel": n["titel"],
"summary": n["summary"],
"datum": n["datum"],
"source": n["source"],
"ressort": n["ressort"],
"tags": tags,
},
"matches": top_matches,
"relevance": relevance,
})
# Sortiere primaer nach Relevanz-Score (high vor mid vor low/none),
# sekundaer nach Datum desc.
level_rank = {"high": 3, "mid": 2, "low": 1, "none": 0}
buckets.sort(
key=lambda b: (
level_rank.get(b["relevance"]["level"], 0),
b["relevance"]["score"],
b["news"]["datum"],
),
reverse=True,
)
return {
"buckets": buckets,
"n_total_news": len(news_rows),
"filter": {
"days_window": days_window,
"top_k": top_k,
"min_similarity": min_similarity,
"matches_per_news": matches_per_news,
"only_relevant": only_relevant,
},
}
def aggregate_themen_zeitreihe(
days_window: int = 30,
db_path: Optional[Path] = None,
) -> dict:
"""News-Volumen pro (Tag, Source) ueber die letzten N Tage —
Stacked-Area-Chart.
Liefert Zeitreihe ohne Antrag-Match — nur die News-Aktivitaet pro
Quelle, damit das Dashboard sehen kann, welche Quellen wie aktiv waren.
"""
from .config import settings
path = db_path or settings.db_path
if not Path(path).exists():
return {"buckets": [], "sources": [], "series": {}}
cutoff_ts = datetime.now(timezone.utc).timestamp() - days_window * 86400
conn = sqlite3.connect(str(path))
try:
rows = conn.execute(
"SELECT datum, source FROM news_articles"
).fetchall()
finally:
conn.close()
counts: defaultdict[tuple[str, str], int] = defaultdict(int)
sources_seen: set[str] = set()
days_seen: set[str] = set()
for datum, source in rows:
if not datum:
continue
try:
ts = datetime.fromisoformat(datum.replace("Z", "+00:00")).timestamp()
except (ValueError, AttributeError):
continue
if ts < cutoff_ts:
continue
day = datum[:10] # YYYY-MM-DD
sources_seen.add(source)
days_seen.add(day)
counts[(day, source)] += 1
days_sorted = sorted(days_seen)
sources_sorted = sorted(sources_seen)
series = {
s: [counts[(d, s)] for d in days_sorted]
for s in sources_sorted
}
return {
"buckets": days_sorted,
"sources": sources_sorted,
"series": series,
}
def aggregate_news_cluster(
days_window: int = 7,
intra_threshold: float = 0.55,
antrag_threshold: float = 0.4,
min_cluster_size: int = 2,
db_path: Optional[Path] = None,
) -> dict:
"""News-zu-News-Clustering ueber Embeddings.
Greedy: jede ungeclusterte News wird Cluster-Seed, alle anderen mit
cosine >= ``intra_threshold`` werden eingeschlossen. Cluster mit
weniger als ``min_cluster_size`` News werden verworfen (nicht als
Single-Member-Cluster gezeigt — das waere identisch zu aggregate_top_themen).
Pro Cluster: zentralster Antrag-Match aus den GWÖ-bewerteten Antraegen.
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return {"clusters": [], "n_total_news": 0}
cutoff = datetime.now(timezone.utc).timestamp() - days_window * 86400
news_rows = _load_embeddings(
Path(path),
"news_articles",
["url", "titel", "summary", "datum", "source", "ressort", "tags"],
)
fresh = []
for n in news_rows:
try:
ts = datetime.fromisoformat(n["datum"].replace("Z", "+00:00")).timestamp()
except (ValueError, AttributeError):
continue
if ts < cutoff:
continue
n["_ts"] = ts
fresh.append(n)
fresh.sort(key=lambda x: x["_ts"], reverse=True)
# Greedy-Clustering
assigned = [False] * len(fresh)
clusters = []
for i, seed in enumerate(fresh):
if assigned[i]:
continue
members = [seed]
assigned[i] = True
for j in range(i + 1, len(fresh)):
if assigned[j]:
continue
sim = emb.cosine_similarity(seed["_vec"], fresh[j]["_vec"])
if sim >= intra_threshold:
members.append(fresh[j])
assigned[j] = True
if len(members) >= min_cluster_size:
clusters.append(members)
# Pro Cluster: zentralster Antrag (Match gegen den Mittelpunkt-Vektor)
assessments = _load_embeddings(
Path(path),
"assessments",
["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
"empfehlung", "datum"],
)
out_clusters = []
for cluster in clusters:
# Mittelpunkt-Embedding (Schwerpunkt)
if not cluster:
continue
dim = len(cluster[0]["_vec"])
centroid = [
sum(m["_vec"][k] for m in cluster) / len(cluster)
for k in range(dim)
]
# Top-Antrag finden
scored_anträge = []
for a in assessments:
sim = emb.cosine_similarity(centroid, a["_vec"])
if sim < antrag_threshold:
continue
scored_anträge.append({
"drucksache": a["drucksache"],
"title": a["title"],
"bundesland": a["bundesland"],
"fraktionen": json.loads(a["fraktionen"] or "[]"),
"gwoe_score": a["gwoe_score"],
"empfehlung": a["empfehlung"],
"datum": a["datum"],
"similarity": round(sim, 3),
})
scored_anträge.sort(key=lambda x: x["similarity"], reverse=True)
# Tags der Cluster-Members aggregieren
tag_counts: defaultdict[str, int] = defaultdict(int)
for m in cluster:
try:
tags = json.loads(m["tags"]) if m["tags"] else []
except (json.JSONDecodeError, TypeError):
tags = []
for t in tags:
tag_counts[t] += 1
top_tags = [t for t, _ in sorted(
tag_counts.items(), key=lambda x: x[1], reverse=True,
)[:5]]
out_clusters.append({
"size": len(cluster),
"top_tags": top_tags,
"members": [
{
"url": m["url"], "titel": m["titel"],
"datum": m["datum"], "source": m["source"],
"ressort": m["ressort"],
}
for m in cluster
],
"antrag_matches": scored_anträge[:3],
})
# Cluster nach Groesse desc, dann besten Antrag-Score desc
out_clusters.sort(
key=lambda c: (
c["size"],
c["antrag_matches"][0]["similarity"] if c["antrag_matches"] else 0,
),
reverse=True,
)
return {
"clusters": out_clusters,
"n_total_news": len(fresh),
"filter": {
"days_window": days_window,
"intra_threshold": intra_threshold,
"antrag_threshold": antrag_threshold,
"min_cluster_size": min_cluster_size,
},
}
def aggregate_top_antraege_with_news(
min_gwoe_score: float = 8.0,
days_window: int = 14,
min_similarity: float = 0.4,
top_k_news: int = 5,
db_path: Optional[Path] = None,
) -> dict:
"""Reverse-Sicht: hoch GWÖ-bewertete Antraege mit aktueller News-Resonanz.
Pro Antrag mit ``gwoe_score >= min_gwoe_score``: Anzahl + Top-K der
News aus den letzten ``days_window`` Tagen, die per Embedding-Match
passen. Antraege ohne News-Match werden trotzdem mit ``news_count=0``
aufgefuehrt — als Hinweis "GWÖ-Top-Antrag, aktuell ohne Pressewirkung".
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return {"antraege": []}
cutoff = datetime.now(timezone.utc).timestamp() - days_window * 86400
# Hoch-GWÖ-Antraege laden
assessments = _load_embeddings(
Path(path),
"assessments",
["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
"empfehlung", "datum", "antrag_zusammenfassung"],
where_extra=" AND gwoe_score >= ?",
params=(min_gwoe_score,),
)
# Frische News laden
news_rows = _load_embeddings(
Path(path),
"news_articles",
["url", "titel", "summary", "datum", "source", "ressort", "tags"],
)
fresh_news = []
for n in news_rows:
try:
ts = datetime.fromisoformat(n["datum"].replace("Z", "+00:00")).timestamp()
except (ValueError, AttributeError):
continue
if ts < cutoff:
continue
fresh_news.append(n)
out = []
for a in assessments:
scored = []
for n in fresh_news:
sim = emb.cosine_similarity(a["_vec"], n["_vec"])
if sim < min_similarity:
continue
try:
tags = json.loads(n["tags"]) if n["tags"] else []
except (json.JSONDecodeError, TypeError):
tags = []
scored.append({
"url": n["url"], "titel": n["titel"],
"summary": n["summary"], "datum": n["datum"],
"source": n["source"], "ressort": n["ressort"],
"tags": tags,
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
out.append({
"drucksache": a["drucksache"],
"title": a["title"],
"bundesland": a["bundesland"],
"fraktionen": json.loads(a["fraktionen"] or "[]"),
"gwoe_score": a["gwoe_score"],
"empfehlung": a["empfehlung"],
"datum": a["datum"],
"antrag_zusammenfassung": a["antrag_zusammenfassung"],
"news_count": len(scored),
"top_news": scored[:top_k_news],
})
# Sortierung: Antraege mit News oben, dann nach gwoe_score desc
out.sort(
key=lambda x: (x["news_count"] > 0, x["news_count"], x["gwoe_score"] or 0),
reverse=True,
)
return {
"antraege": out,
"filter": {
"min_gwoe_score": min_gwoe_score,
"days_window": days_window,
"min_similarity": min_similarity,
"top_k_news": top_k_news,
},
}