gwoe-antragspruefer/app/news_aggregator.py
Dotty Dotter d30fcb132a feat: Stand-Dashboard, Score-Histogram, PM-Markdown, Live-Polling, Cluster-Indicator
Sechs zusammengehoerige UX/Performance-Erweiterungen:

**1. /v2/admin/stand — System-Stand-Dashboard**
KPI-Kacheln (Bewertungen, Plenum-Votes, Match, Vote-Orphans, News, PM-
Drafts, Bookmarks) + GWÖ-Score-Histogram + Per-BL-Tabelle + News-Source-
Tabelle. Auto-Refresh 30 s. Endpoint /api/admin/stand liefert alles in
einem Roundtrip. Nav-Eintrag "Stand" in der Admin-Sektion.

**2. /auswertungen Score-Histogram-Tab**
4. Tab "Score-Verteilung" mit Bar-Chart 0–10. Endpoint
/api/auswertungen/score-histogram liefert Buckets, optional gefiltert
nach Bundesland + Wahlperiode. Reagiert auf den globalen BL-Filter.

**3. PM-Body Markdown-Rendering**
Mini-Renderer im Modal: **bold** / __bold__ / *italic* / _italic_ /
- list-bullets / Doppel-Newline-Paragraphen. Kein externer Markdown-
Parser, keine neue Dependency. Body wird HTML-escaped, Patterns dann
zu Tags umgesetzt.

**4. Performance-Cache fuer themen_matching**
TTL-Cache (60 s) fuer aggregate_top_themen und aggregate_news_cluster.
Cache-Key inkl. aller Filter-Parameter. Automatische Invalidation in
news_aggregator.run_aggregator nach erfolgreichem Insert/Embed.
4 neue Tests fuer cache_get/set/clear-Verhalten.

**5. Stimmverhalten Banner Live-Update**
Statt setTimeout(800) jetzt pollQueueUntilDrained: alle 4 s
GET /api/queue/status, Banner zeigt pending + elapsed live. Bei
pending=0 zwei Polls in Folge: Banner + Stimmverhalten-Charts neu
laden. Max 5 Min Polling-Timeout. Bricht ab wenn Tab gewechselt wird.

**6. Antrag-Detail Cluster-Indicator**
News-Match-Box im Antrag-Detail laedt parallel /aktuelle-themen/cluster
und mappt URL → Cluster. Pro News-Card ein "🔗 Cluster (N News)"-Badge
mit Hover-Tooltip der anderen Cluster-Members. Macht thematische
Bündel sichtbar, ohne Pop-Out auf den Cluster-Tab.

Suite: 1088 → 1092 grün (4 Cache-Tests).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 02:49:06 +02:00

356 lines
13 KiB
Python

"""News-Aggregator fuer das Aktuelle-Themen-Dashboard (#170 Phase 1).
Fetcht regelmaessig News-Headlines aus AI-erlaubenden, oeffentlich-rechtlichen
oder parlamentarischen Quellen:
- **Tagesschau-API** (https://www.tagesschau.de/api2u/news/) — strukturiertes
JSON mit ressort, tags, firstSentence pro Artikel.
- **Bundestag-Aktuellethemen-RSS**
(https://www.bundestag.de/static/appdata/includes/rss/aktuellethemen.rss)
— RSS mit Titel + Beschreibung pro Artikel.
**Bewusst NICHT verwendet:** RND.de (robots.txt bannt explizit ClaudeBot,
GPTBot, ChatGPT-User, CCBot, Google-Extended). RSS-Feeds privat-publizierter
Verlage werden nur dann angebunden, wenn AI-Verarbeitung explizit erlaubt ist.
**Compliance:**
- Volltexte werden NICHT persistiert. Nur Titel + erster Satz / Description.
- Kein User-Agent, der einen AI-Bot vortaeuscht (kein "ClaudeBot").
- Rate-Limiting: 1 Request pro Quelle pro Aufruf (kein Loop, kein Hammer).
Datenbank-Tabelle ``news_articles`` (siehe app/database.py):
url PK, titel, summary, datum (ISO), source, ressort, tags JSON,
summary_embedding BLOB, embedding_model, fetched_at.
"""
from __future__ import annotations
import json
import logging
import re
import urllib.error
import urllib.request
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
USER_AGENT = "GWOeAntragspruefer/1.0 (+https://gwoe.toppyr.de)"
TIMEOUT = 20
# ─────────────────────────────────────────────────────────────────────────────
# Quellen
# ─────────────────────────────────────────────────────────────────────────────
TAGESSCHAU_API = "https://www.tagesschau.de/api2u/news"
# Politische Tagesschau-Ressorts — Sport/Panorama/Sport rausgefiltert,
# weil sie selten zu parlamentarischen Antraegen passen.
TAGESSCHAU_RESSORTS = ["inland", "ausland", "wirtschaft", "wissen"]
BUNDESTAG_RSS = {
"bundestag-aktuell": (
"https://www.bundestag.de/static/appdata/includes/rss/aktuellethemen.rss"
),
"bundestag-presse": (
"https://www.bundestag.de/static/appdata/includes/rss/pressemitteilungen.rss"
),
"bundestag-hib": (
"https://www.bundestag.de/static/appdata/includes/rss/hib.rss"
),
}
# ─────────────────────────────────────────────────────────────────────────────
# HTTP-Helper
# ─────────────────────────────────────────────────────────────────────────────
def _http_get(url: str) -> Optional[bytes]:
"""GET mit ehrlichem User-Agent + Timeout. Gibt None bei Fehler."""
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
try:
with urllib.request.urlopen(req, timeout=TIMEOUT) as r:
return r.read()
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as e:
logger.warning("news fetch failed: %s%s", url, e)
return None
def _strip_html(text: str) -> str:
"""Entfernt HTML-Tags + CDATA fuer Plaintext-Summaries."""
if not text:
return ""
text = re.sub(r"<!\[CDATA\[(.*?)\]\]>", r"\1", text, flags=re.DOTALL)
text = re.sub(r"<[^>]+>", " ", text)
text = text.replace("&amp;", "&").replace("&nbsp;", " ").replace("&quot;", '"')
return re.sub(r"\s+", " ", text).strip()
# ─────────────────────────────────────────────────────────────────────────────
# Parser
# ─────────────────────────────────────────────────────────────────────────────
def fetch_tagesschau(ressorts: Optional[list[str]] = None) -> list[dict]:
"""Holt News aus der Tagesschau-API. Liefert Liste von Dicts mit den
Feldern: url, titel, summary, datum, source, ressort, tags.
Volltexte (``content``) werden bewusst nicht uebernommen — nur die in
der API verfuegbare ``firstSentence`` als Summary.
"""
ressorts = ressorts or TAGESSCHAU_RESSORTS
out: list[dict] = []
seen: set[str] = set()
for ressort in ressorts:
url = f"{TAGESSCHAU_API}?ressort={ressort}"
raw = _http_get(url)
if not raw:
continue
try:
data = json.loads(raw.decode("utf-8"))
except json.JSONDecodeError:
logger.warning("tagesschau JSON parse failed: %s", url)
continue
for item in data.get("news") or []:
link = item.get("shareURL") or item.get("detailsweb")
if not link or link in seen:
continue
seen.add(link)
titel = (item.get("title") or "").strip()
if not titel:
continue
summary = (item.get("firstSentence") or "").strip()
datum = item.get("date") or ""
tags = [t.get("tag") for t in (item.get("tags") or []) if t.get("tag")]
out.append({
"url": link,
"titel": titel,
"summary": summary,
"datum": datum,
"source": "tagesschau",
"ressort": item.get("ressort") or ressort,
"tags": tags,
})
return out
_RSS_ITEM_RE = re.compile(r"<item>(.*?)</item>", re.DOTALL)
_RSS_TITLE_RE = re.compile(r"<title>(.*?)</title>", re.DOTALL)
_RSS_LINK_RE = re.compile(r"<link>(.*?)</link>")
_RSS_DESC_RE = re.compile(r"<description>(.*?)</description>", re.DOTALL)
_RSS_PUB_RE = re.compile(r"<pubDate>(.*?)</pubDate>")
def _parse_rss_date(s: str) -> str:
"""Konvertiere RSS-pubDate (RFC 822) → ISO-8601-Datum."""
if not s:
return ""
try:
dt = parsedate_to_datetime(s.strip())
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat()
except (TypeError, ValueError):
return ""
def fetch_rss(source: str, url: str, max_items: int = 50) -> list[dict]:
"""Generischer RSS-2.0-Parser. Liefert dicts wie fetch_tagesschau."""
raw = _http_get(url)
if not raw:
return []
text = raw.decode("utf-8", errors="replace")
items_xml = _RSS_ITEM_RE.findall(text)[:max_items]
out: list[dict] = []
for item in items_xml:
title_m = _RSS_TITLE_RE.search(item)
link_m = _RSS_LINK_RE.search(item)
desc_m = _RSS_DESC_RE.search(item)
pub_m = _RSS_PUB_RE.search(item)
titel = _strip_html(title_m.group(1)) if title_m else ""
link = _strip_html(link_m.group(1)) if link_m else ""
if not titel or not link:
continue
summary = _strip_html(desc_m.group(1)) if desc_m else ""
datum = _parse_rss_date(pub_m.group(1)) if pub_m else ""
out.append({
"url": link,
"titel": titel,
"summary": summary,
"datum": datum,
"source": source,
"ressort": None,
"tags": [],
})
return out
def fetch_all() -> list[dict]:
"""Holt alle konfigurierten Quellen ein. Kein Caching, kein Auto-Retry."""
out: list[dict] = []
out.extend(fetch_tagesschau())
for source, url in BUNDESTAG_RSS.items():
out.extend(fetch_rss(source, url))
return out
# ─────────────────────────────────────────────────────────────────────────────
# DB-Persistierung
# ─────────────────────────────────────────────────────────────────────────────
def upsert_articles(
articles: list[dict],
db_path: Optional[Path] = None,
embed: bool = True,
) -> dict:
"""Schreibe oder aktualisiere News-Artikel in der DB.
Idempotent ueber URL-PK. Existierende Eintraege bekommen ein neues
``fetched_at``, aber Embedding bleibt persistent (sonst LLM-Kosten
pro Cron-Lauf).
Returns:
``{"inserted": int, "updated": int, "embedded": int}``
"""
import sqlite3
from .config import settings
path = db_path or settings.db_path
if not Path(path).exists():
return {"inserted": 0, "updated": 0, "embedded": 0}
conn = sqlite3.connect(str(path))
inserted = 0
updated = 0
embedded = 0
try:
for art in articles:
url = art["url"]
cur = conn.execute(
"SELECT summary_embedding IS NOT NULL FROM news_articles WHERE url=?",
(url,),
)
row = cur.fetchone()
tags_json = json.dumps(art.get("tags") or [])
if row is None:
conn.execute(
"""INSERT INTO news_articles
(url, titel, summary, datum, source, ressort, tags, fetched_at)
VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))""",
(
url, art["titel"], art.get("summary") or "",
art.get("datum") or "",
art["source"], art.get("ressort"), tags_json,
),
)
inserted += 1
else:
conn.execute(
"""UPDATE news_articles
SET titel=?, summary=?, datum=?, source=?, ressort=?, tags=?,
fetched_at=datetime('now')
WHERE url=?""",
(
art["titel"], art.get("summary") or "",
art.get("datum") or "",
art["source"], art.get("ressort"), tags_json,
url,
),
)
updated += 1
conn.commit()
finally:
conn.close()
if embed:
embedded = embed_pending_articles(db_path=db_path)
return {"inserted": inserted, "updated": updated, "embedded": embedded}
def embed_pending_articles(
db_path: Optional[Path] = None,
limit: int = 100,
) -> int:
"""Erzeuge Embeddings fuer alle News-Artikel ohne ``summary_embedding``.
Embedded wird ein Stueck-Text aus Titel + Summary + Tags. Bei
Embedding-API-Fehler wird der Artikel uebersprungen — naechster Run
holt ihn nach.
"""
import sqlite3
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return 0
conn = sqlite3.connect(str(path))
try:
rows = conn.execute(
"""SELECT url, titel, summary, tags FROM news_articles
WHERE summary_embedding IS NULL ORDER BY datum DESC LIMIT ?""",
(limit,),
).fetchall()
finally:
conn.close()
if not rows:
return 0
embedded = 0
conn = sqlite3.connect(str(path))
try:
for url, titel, summary, tags_raw in rows:
try:
tags = json.loads(tags_raw) if tags_raw else []
except (json.JSONDecodeError, TypeError):
tags = []
parts = [titel or ""]
if summary:
parts.append(summary)
if tags:
parts.append(", ".join(tags))
text = "\n".join(p for p in parts if p).strip()
if not text:
continue
try:
vec = emb.create_embedding(text, model=emb.EMBEDDING_MODEL)
except Exception:
logger.exception("embed_pending_articles: API error for %s", url)
continue
conn.execute(
"""UPDATE news_articles
SET summary_embedding=?, embedding_model=?
WHERE url=?""",
(json.dumps(vec).encode(), emb.EMBEDDING_MODEL, url),
)
embedded += 1
conn.commit()
finally:
conn.close()
return embedded
def run_aggregator(db_path: Optional[Path] = None, embed: bool = True) -> dict:
"""Top-Level: alle Quellen holen + persistieren + embedden.
Sicher fuer Cron-Aufrufe — fehlende Quellen werden geloggt, nicht
geworfen.
"""
articles = fetch_all()
result = upsert_articles(articles, db_path=db_path, embed=embed)
# Cache invalidieren, damit das Dashboard die neuen News sofort zeigt.
if result.get("inserted", 0) > 0 or result.get("embedded", 0) > 0:
try:
from . import themen_matching
themen_matching.cache_clear()
except Exception:
logger.exception("themen_matching cache_clear failed")
return result