gwoe-antragspruefer/app/themen_matching.py

372 lines
11 KiB
Python
Raw Normal View History

feat(#170): Aktuelle-Themen-Dashboard — News × Anträge × Pressemitteilungen Vollständiges 4-Phasen-Feature: **Phase 1 — News-Aggregator** (`app/news_aggregator.py`) - Tagesschau-API (`/api2u/news?ressort=...`) für inland/ausland/wirtschaft/wissen - Bundestag-RSS für aktuellethemen / pressemitteilungen / hib - DB-Tabelle `news_articles` (URL-PK, idempotent) - Embeddings via existierender qwen-v4-Pipeline - Cron-Script `scripts/auto-fetch-news.sh` - Bewusst NICHT: RND.de (robots.txt bannt explizit ClaudeBot, GPTBot, CCBot, ChatGPT-User, Google-Extended). Nur AI-erlaubende, öffentlich- rechtliche/parlamentarische Quellen - Volltexte werden NICHT persistiert (nur Titel + erster Satz) **Phase 2 — Themen × Anträge Matching** (`app/themen_matching.py`) - News-Embedding × Assessment-summary_embedding via Cosine-Similarity - `find_anträge_for_news`: pro News die Top-K passenden Anträge - `find_news_for_antrag`: pro Antrag Top-K News mit Datums-Fenster (90d) - `aggregate_top_themen`: primärer Dashboard-Endpoint - `aggregate_themen_zeitreihe`: News-Volumen pro Tag × Source **Phase 3 — Dashboard-View** (`/aktuelle-themen`) - Neuer linker Nav-Eintrag „Aktuelle Themen" - Stacked-Area-Chart News-Volumen pro Quelle (30d) - Pro News-Card: Titel + Summary + Tags + Top-3-Antrags-Match-Liste mit GWÖ-Score-Pill, Drucksache-Link, PM-Vorschlag-Button - Filter: Zeitfenster, Top-N, min_similarity - Auth-protected (require_auth) **Phase 4 — Pressemitteilungs-Generator** (`app/presse_generator.py`) - LLM-Prompt-Template (200-250 Worte, GWÖ-Sicht, JSON-Output) - Reuse von `QwenBewerter` aus app/adapters/qwen_bewerter.py - DB-Tabelle `presse_drafts` (Persistenz) - POST `/api/aktuelle-themen/generate-presse` rate-limited 5/min, auth-only (LLM-Kosten) - GET `/api/aktuelle-themen/drafts` + `/drafts/{id}` für Liste/Detail - Manueller Trigger via UI-Button, kein Auto-Versand - Modal-Anzeige des generierten Texts **Compliance:** - robots.txt-respektierend (ClaudeBot-Bann von RND vermieden, AI- erlaubende Quellen verwendet) - UI zeigt nur Titel+URL+Datum+erster Satz, keine Volltext-Reproduktion - Pressemitteilungen sind explizit Drafts, nicht Auto-Versand - LLM-Calls rate-limited, auth-only **Tests:** 43 neue Tests (19 news_aggregator + 16 themen_matching + 8 presse_generator). Suite jetzt 1048 grün. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 12:39:36 +02:00
"""Themen × Anträge Matching fuer das Aktuelle-Themen-Dashboard
(#170 Phase 2).
Verschneidet News-Artikel-Embeddings (aus news_articles.summary_embedding)
mit Antrag-Embeddings (assessments.summary_embedding) per Cosine-Similarity.
Liefert pro News-Artikel die Top-K-passendsten Anträge.
Reuse:
- ``embeddings.cosine_similarity`` fuer den Vektor-Vergleich
- Beide Tabellen nutzen denselben Embedding-Modell-Vektorraum (qwen v4),
daher direkter Cross-Vergleich moeglich
- Filter ueber ``embedding_model``-Spalte, falls Migration laueft
"""
from __future__ import annotations
import json
import logging
import sqlite3
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
def _load_embeddings(
db_path: Path,
table: str,
select_cols: list[str],
where_extra: str = "",
params: tuple = (),
) -> list[dict]:
"""Generischer Loader fuer Tabellen mit ``summary_embedding``-Spalte.
Liefert Zeilen mit decoded Embedding-Vektor (oder filtert aus, wenn
Modell nicht zum aktuellen READ-Modell passt).
"""
from . import embeddings as emb
if not Path(db_path).exists():
return []
conn = sqlite3.connect(str(db_path))
try:
conn.row_factory = sqlite3.Row
cols = ", ".join(select_cols)
sql = (
f"SELECT {cols}, summary_embedding, embedding_model "
f"FROM {table} "
f"WHERE summary_embedding IS NOT NULL {where_extra}"
)
rows = conn.execute(sql, params).fetchall()
finally:
conn.close()
out = []
for r in rows:
if r["embedding_model"] != emb.EMBEDDING_MODEL_READ:
continue
try:
vec = json.loads(r["summary_embedding"])
except (json.JSONDecodeError, TypeError):
continue
d = dict(r)
d["_vec"] = vec
out.append(d)
return out
def find_anträge_for_news(
news_url: str,
top_k: int = 5,
min_similarity: float = 0.4,
db_path: Optional[Path] = None,
) -> list[dict]:
"""Pro gegebener News-URL: Top-K aehnlichste Antraege per Cosine-Match.
Filter ``min_similarity`` haelt den Cut-Off fuer "passt einigermassen".
0.4 ist empirisch der Punkt, ab dem qwen-v4-Embeddings semantisch
relevant matchen.
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return []
# 1. News-Vektor laden
conn = sqlite3.connect(str(path))
try:
row = conn.execute(
"""SELECT summary_embedding, embedding_model
FROM news_articles WHERE url=?""",
(news_url,),
).fetchone()
finally:
conn.close()
if not row or not row[0] or row[1] != emb.EMBEDDING_MODEL_READ:
return []
try:
news_vec = json.loads(row[0])
except (json.JSONDecodeError, TypeError):
return []
# 2. Alle Assessments mit Embedding laden + scoren
assessments = _load_embeddings(
Path(path),
"assessments",
["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
"empfehlung", "themen", "datum"],
)
scored = []
for a in assessments:
sim = emb.cosine_similarity(news_vec, a["_vec"])
if sim < min_similarity:
continue
scored.append({
"drucksache": a["drucksache"],
"title": a["title"],
"bundesland": a["bundesland"],
"fraktionen": json.loads(a["fraktionen"] or "[]"),
"gwoe_score": a["gwoe_score"],
"empfehlung": a["empfehlung"],
"themen": json.loads(a["themen"] or "[]"),
"datum": a["datum"],
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
return scored[:top_k]
def find_news_for_antrag(
drucksache: str,
top_k: int = 5,
min_similarity: float = 0.4,
days_window: int = 90,
db_path: Optional[Path] = None,
) -> list[dict]:
"""Pro gegebener Drucksache: Top-K aehnlichste News-Artikel per Cosine.
Filtert News auf ein Zeitfenster (Default 90 Tage), damit
Pressemitteilungen aus aktueller Aktualitaet stammen.
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return []
# 1. Antrag-Vektor laden
conn = sqlite3.connect(str(path))
try:
row = conn.execute(
"""SELECT summary_embedding, embedding_model
FROM assessments WHERE drucksache=?""",
(drucksache,),
).fetchone()
finally:
conn.close()
if not row or not row[0] or row[1] != emb.EMBEDDING_MODEL_READ:
return []
try:
antrag_vec = json.loads(row[0])
except (json.JSONDecodeError, TypeError):
return []
# 2. News mit Datums-Filter laden
cutoff = datetime.now(timezone.utc).timestamp() - days_window * 86400
news = _load_embeddings(
Path(path),
"news_articles",
["url", "titel", "summary", "datum", "source", "ressort", "tags"],
)
scored = []
for n in news:
sim = emb.cosine_similarity(antrag_vec, n["_vec"])
if sim < min_similarity:
continue
# Datums-Filter
try:
news_ts = datetime.fromisoformat(
n["datum"].replace("Z", "+00:00")
).timestamp()
if news_ts < cutoff:
continue
except (ValueError, AttributeError):
pass # Wenn Datum nicht parsbar, lass es durch
try:
tags = json.loads(n["tags"]) if n["tags"] else []
except (json.JSONDecodeError, TypeError):
tags = []
scored.append({
"url": n["url"],
"titel": n["titel"],
"summary": n["summary"],
"datum": n["datum"],
"source": n["source"],
"ressort": n["ressort"],
"tags": tags,
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
return scored[:top_k]
def aggregate_top_themen(
days_window: int = 7,
top_k: int = 10,
min_similarity: float = 0.4,
matches_per_news: int = 3,
db_path: Optional[Path] = None,
) -> dict:
"""Top-K aktuelle News (letzte N Tage) mit jeweils ihren passendsten
Antraegen der primaere Dashboard-Endpoint.
Returns:
``{
"buckets": [{
"news": {url, titel, summary, datum, source, ressort, tags},
"matches": [{drucksache, title, gwoe_score, similarity, ...}]
}, ...],
"n_total_news": int,
"filter": {...}
}``
"""
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return {"buckets": [], "n_total_news": 0, "filter": {
"days_window": days_window, "top_k": top_k,
"min_similarity": min_similarity,
}}
cutoff = (
datetime.now(timezone.utc).timestamp() - days_window * 86400
)
news_rows = _load_embeddings(
Path(path),
"news_articles",
["url", "titel", "summary", "datum", "source", "ressort", "tags"],
)
# Nach Datum filtern
fresh = []
for n in news_rows:
try:
news_ts = datetime.fromisoformat(
n["datum"].replace("Z", "+00:00")
).timestamp()
except (ValueError, AttributeError):
continue
if news_ts < cutoff:
continue
n["_ts"] = news_ts
fresh.append(n)
# Nach Datum desc sortieren, top_k cutten
fresh.sort(key=lambda x: x["_ts"], reverse=True)
fresh = fresh[:top_k]
# Pro News: alle Antraege scoren, Top matches_per_news behalten
assessments = _load_embeddings(
Path(path),
"assessments",
["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
"empfehlung", "themen", "datum"],
)
buckets = []
for n in fresh:
scored = []
for a in assessments:
sim = emb.cosine_similarity(n["_vec"], a["_vec"])
if sim < min_similarity:
continue
scored.append({
"drucksache": a["drucksache"],
"title": a["title"],
"bundesland": a["bundesland"],
"fraktionen": json.loads(a["fraktionen"] or "[]"),
"gwoe_score": a["gwoe_score"],
"empfehlung": a["empfehlung"],
"datum": a["datum"],
"similarity": round(sim, 3),
})
scored.sort(key=lambda x: x["similarity"], reverse=True)
try:
tags = json.loads(n["tags"]) if n["tags"] else []
except (json.JSONDecodeError, TypeError):
tags = []
buckets.append({
"news": {
"url": n["url"],
"titel": n["titel"],
"summary": n["summary"],
"datum": n["datum"],
"source": n["source"],
"ressort": n["ressort"],
"tags": tags,
},
"matches": scored[:matches_per_news],
})
return {
"buckets": buckets,
"n_total_news": len(news_rows),
"filter": {
"days_window": days_window,
"top_k": top_k,
"min_similarity": min_similarity,
"matches_per_news": matches_per_news,
},
}
def aggregate_themen_zeitreihe(
days_window: int = 30,
db_path: Optional[Path] = None,
) -> dict:
"""News-Volumen pro (Tag, Source) ueber die letzten N Tage —
Stacked-Area-Chart.
Liefert Zeitreihe ohne Antrag-Match nur die News-Aktivitaet pro
Quelle, damit das Dashboard sehen kann, welche Quellen wie aktiv waren.
"""
from .config import settings
path = db_path or settings.db_path
if not Path(path).exists():
return {"buckets": [], "sources": [], "series": {}}
cutoff_ts = datetime.now(timezone.utc).timestamp() - days_window * 86400
conn = sqlite3.connect(str(path))
try:
rows = conn.execute(
"SELECT datum, source FROM news_articles"
).fetchall()
finally:
conn.close()
counts: defaultdict[tuple[str, str], int] = defaultdict(int)
sources_seen: set[str] = set()
days_seen: set[str] = set()
for datum, source in rows:
if not datum:
continue
try:
ts = datetime.fromisoformat(datum.replace("Z", "+00:00")).timestamp()
except (ValueError, AttributeError):
continue
if ts < cutoff_ts:
continue
day = datum[:10] # YYYY-MM-DD
sources_seen.add(source)
days_seen.add(day)
counts[(day, source)] += 1
days_sorted = sorted(days_seen)
sources_sorted = sorted(sources_seen)
series = {
s: [counts[(d, s)] for d in days_sorted]
for s in sources_sorted
}
return {
"buckets": days_sorted,
"sources": sources_sorted,
"series": series,
}