gwoe-antragspruefer/app/themen_matching.py
Dotty Dotter d54ce23e42 feat(#170): Aktuelle-Themen-Dashboard — News × Anträge × Pressemitteilungen
Vollständiges 4-Phasen-Feature:

**Phase 1 — News-Aggregator** (`app/news_aggregator.py`)
- Tagesschau-API (`/api2u/news?ressort=...`) für inland/ausland/wirtschaft/wissen
- Bundestag-RSS für aktuellethemen / pressemitteilungen / hib
- DB-Tabelle `news_articles` (URL-PK, idempotent)
- Embeddings via existierender qwen-v4-Pipeline
- Cron-Script `scripts/auto-fetch-news.sh`
- Bewusst NICHT: RND.de (robots.txt bannt explizit ClaudeBot, GPTBot,
  CCBot, ChatGPT-User, Google-Extended). Nur AI-erlaubende, öffentlich-
  rechtliche/parlamentarische Quellen
- Volltexte werden NICHT persistiert (nur Titel + erster Satz)

**Phase 2 — Themen × Anträge Matching** (`app/themen_matching.py`)
- News-Embedding × Assessment-summary_embedding via Cosine-Similarity
- `find_anträge_for_news`: pro News die Top-K passenden Anträge
- `find_news_for_antrag`: pro Antrag Top-K News mit Datums-Fenster (90d)
- `aggregate_top_themen`: primärer Dashboard-Endpoint
- `aggregate_themen_zeitreihe`: News-Volumen pro Tag × Source

**Phase 3 — Dashboard-View** (`/aktuelle-themen`)
- Neuer linker Nav-Eintrag „Aktuelle Themen"
- Stacked-Area-Chart News-Volumen pro Quelle (30d)
- Pro News-Card: Titel + Summary + Tags + Top-3-Antrags-Match-Liste
  mit GWÖ-Score-Pill, Drucksache-Link, PM-Vorschlag-Button
- Filter: Zeitfenster, Top-N, min_similarity
- Auth-protected (require_auth)

**Phase 4 — Pressemitteilungs-Generator** (`app/presse_generator.py`)
- LLM-Prompt-Template (200-250 Worte, GWÖ-Sicht, JSON-Output)
- Reuse von `QwenBewerter` aus app/adapters/qwen_bewerter.py
- DB-Tabelle `presse_drafts` (Persistenz)
- POST `/api/aktuelle-themen/generate-presse` rate-limited 5/min,
  auth-only (LLM-Kosten)
- GET `/api/aktuelle-themen/drafts` + `/drafts/{id}` für Liste/Detail
- Manueller Trigger via UI-Button, kein Auto-Versand
- Modal-Anzeige des generierten Texts

**Compliance:**
- robots.txt-respektierend (ClaudeBot-Bann von RND vermieden, AI-
  erlaubende Quellen verwendet)
- UI zeigt nur Titel+URL+Datum+erster Satz, keine Volltext-Reproduktion
- Pressemitteilungen sind explizit Drafts, nicht Auto-Versand
- LLM-Calls rate-limited, auth-only

**Tests:** 43 neue Tests (19 news_aggregator + 16 themen_matching +
8 presse_generator). Suite jetzt 1048 grün.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 12:39:36 +02:00

372 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Themen × Anträge Matching fuer das Aktuelle-Themen-Dashboard
(#170 Phase 2).
Verschneidet News-Artikel-Embeddings (aus news_articles.summary_embedding)
mit Antrag-Embeddings (assessments.summary_embedding) per Cosine-Similarity.
Liefert pro News-Artikel die Top-K-passendsten Anträge.
Reuse:
- ``embeddings.cosine_similarity`` fuer den Vektor-Vergleich
- Beide Tabellen nutzen denselben Embedding-Modell-Vektorraum (qwen v4),
daher direkter Cross-Vergleich moeglich
- Filter ueber ``embedding_model``-Spalte, falls Migration laueft
"""
from __future__ import annotations
import json
import logging
import sqlite3
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
def _load_embeddings(
db_path: Path,
table: str,
select_cols: list[str],
where_extra: str = "",
params: tuple = (),
) -> list[dict]:
"""Generischer Loader fuer Tabellen mit ``summary_embedding``-Spalte.
Liefert Zeilen mit decoded Embedding-Vektor (oder filtert aus, wenn
Modell nicht zum aktuellen READ-Modell passt).
"""
from . import embeddings as emb
if not Path(db_path).exists():
return []
conn = sqlite3.connect(str(db_path))
try:
conn.row_factory = sqlite3.Row
cols = ", ".join(select_cols)
sql = (
f"SELECT {cols}, summary_embedding, embedding_model "
f"FROM {table} "
f"WHERE summary_embedding IS NOT NULL {where_extra}"
)
rows = conn.execute(sql, params).fetchall()
finally:
conn.close()
out = []
for r in rows:
if r["embedding_model"] != emb.EMBEDDING_MODEL_READ:
continue
try:
vec = json.loads(r["summary_embedding"])
except (json.JSONDecodeError, TypeError):
continue
d = dict(r)
d["_vec"] = vec
out.append(d)
return out
def find_anträge_for_news(
news_url: str,
top_k: int = 5,
min_similarity: float = 0.4,
db_path: Optional[Path] = None,
) -> list[dict]:
"""Pro gegebener News-URL: Top-K aehnlichste Antraege per Cosine-Match.
Filter ``min_similarity`` haelt den Cut-Off fuer "passt einigermassen".
0.4 ist empirisch der Punkt, ab dem qwen-v4-Embeddings semantisch
relevant matchen.
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return []
# 1. News-Vektor laden
conn = sqlite3.connect(str(path))
try:
row = conn.execute(
"""SELECT summary_embedding, embedding_model
FROM news_articles WHERE url=?""",
(news_url,),
).fetchone()
finally:
conn.close()
if not row or not row[0] or row[1] != emb.EMBEDDING_MODEL_READ:
return []
try:
news_vec = json.loads(row[0])
except (json.JSONDecodeError, TypeError):
return []
# 2. Alle Assessments mit Embedding laden + scoren
assessments = _load_embeddings(
Path(path),
"assessments",
["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
"empfehlung", "themen", "datum"],
)
scored = []
for a in assessments:
sim = emb.cosine_similarity(news_vec, a["_vec"])
if sim < min_similarity:
continue
scored.append({
"drucksache": a["drucksache"],
"title": a["title"],
"bundesland": a["bundesland"],
"fraktionen": json.loads(a["fraktionen"] or "[]"),
"gwoe_score": a["gwoe_score"],
"empfehlung": a["empfehlung"],
"themen": json.loads(a["themen"] or "[]"),
"datum": a["datum"],
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
return scored[:top_k]
def find_news_for_antrag(
drucksache: str,
top_k: int = 5,
min_similarity: float = 0.4,
days_window: int = 90,
db_path: Optional[Path] = None,
) -> list[dict]:
"""Pro gegebener Drucksache: Top-K aehnlichste News-Artikel per Cosine.
Filtert News auf ein Zeitfenster (Default 90 Tage), damit
Pressemitteilungen aus aktueller Aktualitaet stammen.
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return []
# 1. Antrag-Vektor laden
conn = sqlite3.connect(str(path))
try:
row = conn.execute(
"""SELECT summary_embedding, embedding_model
FROM assessments WHERE drucksache=?""",
(drucksache,),
).fetchone()
finally:
conn.close()
if not row or not row[0] or row[1] != emb.EMBEDDING_MODEL_READ:
return []
try:
antrag_vec = json.loads(row[0])
except (json.JSONDecodeError, TypeError):
return []
# 2. News mit Datums-Filter laden
cutoff = datetime.now(timezone.utc).timestamp() - days_window * 86400
news = _load_embeddings(
Path(path),
"news_articles",
["url", "titel", "summary", "datum", "source", "ressort", "tags"],
)
scored = []
for n in news:
sim = emb.cosine_similarity(antrag_vec, n["_vec"])
if sim < min_similarity:
continue
# Datums-Filter
try:
news_ts = datetime.fromisoformat(
n["datum"].replace("Z", "+00:00")
).timestamp()
if news_ts < cutoff:
continue
except (ValueError, AttributeError):
pass # Wenn Datum nicht parsbar, lass es durch
try:
tags = json.loads(n["tags"]) if n["tags"] else []
except (json.JSONDecodeError, TypeError):
tags = []
scored.append({
"url": n["url"],
"titel": n["titel"],
"summary": n["summary"],
"datum": n["datum"],
"source": n["source"],
"ressort": n["ressort"],
"tags": tags,
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
return scored[:top_k]
def aggregate_top_themen(
days_window: int = 7,
top_k: int = 10,
min_similarity: float = 0.4,
matches_per_news: int = 3,
db_path: Optional[Path] = None,
) -> dict:
"""Top-K aktuelle News (letzte N Tage) mit jeweils ihren passendsten
Antraegen — der primaere Dashboard-Endpoint.
Returns:
``{
"buckets": [{
"news": {url, titel, summary, datum, source, ressort, tags},
"matches": [{drucksache, title, gwoe_score, similarity, ...}]
}, ...],
"n_total_news": int,
"filter": {...}
}``
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return {"buckets": [], "n_total_news": 0, "filter": {
"days_window": days_window, "top_k": top_k,
"min_similarity": min_similarity,
}}
cutoff = (
datetime.now(timezone.utc).timestamp() - days_window * 86400
)
news_rows = _load_embeddings(
Path(path),
"news_articles",
["url", "titel", "summary", "datum", "source", "ressort", "tags"],
)
# Nach Datum filtern
fresh = []
for n in news_rows:
try:
news_ts = datetime.fromisoformat(
n["datum"].replace("Z", "+00:00")
).timestamp()
except (ValueError, AttributeError):
continue
if news_ts < cutoff:
continue
n["_ts"] = news_ts
fresh.append(n)
# Nach Datum desc sortieren, top_k cutten
fresh.sort(key=lambda x: x["_ts"], reverse=True)
fresh = fresh[:top_k]
# Pro News: alle Antraege scoren, Top matches_per_news behalten
assessments = _load_embeddings(
Path(path),
"assessments",
["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
"empfehlung", "themen", "datum"],
)
buckets = []
for n in fresh:
scored = []
for a in assessments:
sim = emb.cosine_similarity(n["_vec"], a["_vec"])
if sim < min_similarity:
continue
scored.append({
"drucksache": a["drucksache"],
"title": a["title"],
"bundesland": a["bundesland"],
"fraktionen": json.loads(a["fraktionen"] or "[]"),
"gwoe_score": a["gwoe_score"],
"empfehlung": a["empfehlung"],
"datum": a["datum"],
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
try:
tags = json.loads(n["tags"]) if n["tags"] else []
except (json.JSONDecodeError, TypeError):
tags = []
buckets.append({
"news": {
"url": n["url"],
"titel": n["titel"],
"summary": n["summary"],
"datum": n["datum"],
"source": n["source"],
"ressort": n["ressort"],
"tags": tags,
},
"matches": scored[:matches_per_news],
})
return {
"buckets": buckets,
"n_total_news": len(news_rows),
"filter": {
"days_window": days_window,
"top_k": top_k,
"min_similarity": min_similarity,
"matches_per_news": matches_per_news,
},
}
def aggregate_themen_zeitreihe(
days_window: int = 30,
db_path: Optional[Path] = None,
) -> dict:
"""News-Volumen pro (Tag, Source) ueber die letzten N Tage —
Stacked-Area-Chart.
Liefert Zeitreihe ohne Antrag-Match — nur die News-Aktivitaet pro
Quelle, damit das Dashboard sehen kann, welche Quellen wie aktiv waren.
"""
from .config import settings
path = db_path or settings.db_path
if not Path(path).exists():
return {"buckets": [], "sources": [], "series": {}}
cutoff_ts = datetime.now(timezone.utc).timestamp() - days_window * 86400
conn = sqlite3.connect(str(path))
try:
rows = conn.execute(
"SELECT datum, source FROM news_articles"
).fetchall()
finally:
conn.close()
counts: defaultdict[tuple[str, str], int] = defaultdict(int)
sources_seen: set[str] = set()
days_seen: set[str] = set()
for datum, source in rows:
if not datum:
continue
try:
ts = datetime.fromisoformat(datum.replace("Z", "+00:00")).timestamp()
except (ValueError, AttributeError):
continue
if ts < cutoff_ts:
continue
day = datum[:10] # YYYY-MM-DD
sources_seen.add(source)
days_seen.add(day)
counts[(day, source)] += 1
days_sorted = sorted(days_seen)
sources_sorted = sorted(sources_seen)
series = {
s: [counts[(d, s)] for d in days_sorted]
for s in sources_sorted
}
return {
"buckets": days_sorted,
"sources": sources_sorted,
"series": series,
}