gwoe-antragspruefer/app/news_aggregator.py

356 lines
13 KiB
Python
Raw Normal View History

feat(#170): Aktuelle-Themen-Dashboard — News × Anträge × Pressemitteilungen Vollständiges 4-Phasen-Feature: **Phase 1 — News-Aggregator** (`app/news_aggregator.py`) - Tagesschau-API (`/api2u/news?ressort=...`) für inland/ausland/wirtschaft/wissen - Bundestag-RSS für aktuellethemen / pressemitteilungen / hib - DB-Tabelle `news_articles` (URL-PK, idempotent) - Embeddings via existierender qwen-v4-Pipeline - Cron-Script `scripts/auto-fetch-news.sh` - Bewusst NICHT: RND.de (robots.txt bannt explizit ClaudeBot, GPTBot, CCBot, ChatGPT-User, Google-Extended). Nur AI-erlaubende, öffentlich- rechtliche/parlamentarische Quellen - Volltexte werden NICHT persistiert (nur Titel + erster Satz) **Phase 2 — Themen × Anträge Matching** (`app/themen_matching.py`) - News-Embedding × Assessment-summary_embedding via Cosine-Similarity - `find_anträge_for_news`: pro News die Top-K passenden Anträge - `find_news_for_antrag`: pro Antrag Top-K News mit Datums-Fenster (90d) - `aggregate_top_themen`: primärer Dashboard-Endpoint - `aggregate_themen_zeitreihe`: News-Volumen pro Tag × Source **Phase 3 — Dashboard-View** (`/aktuelle-themen`) - Neuer linker Nav-Eintrag „Aktuelle Themen" - Stacked-Area-Chart News-Volumen pro Quelle (30d) - Pro News-Card: Titel + Summary + Tags + Top-3-Antrags-Match-Liste mit GWÖ-Score-Pill, Drucksache-Link, PM-Vorschlag-Button - Filter: Zeitfenster, Top-N, min_similarity - Auth-protected (require_auth) **Phase 4 — Pressemitteilungs-Generator** (`app/presse_generator.py`) - LLM-Prompt-Template (200-250 Worte, GWÖ-Sicht, JSON-Output) - Reuse von `QwenBewerter` aus app/adapters/qwen_bewerter.py - DB-Tabelle `presse_drafts` (Persistenz) - POST `/api/aktuelle-themen/generate-presse` rate-limited 5/min, auth-only (LLM-Kosten) - GET `/api/aktuelle-themen/drafts` + `/drafts/{id}` für Liste/Detail - Manueller Trigger via UI-Button, kein Auto-Versand - Modal-Anzeige des generierten Texts **Compliance:** - robots.txt-respektierend (ClaudeBot-Bann von RND vermieden, AI- erlaubende Quellen verwendet) - UI zeigt nur Titel+URL+Datum+erster Satz, keine Volltext-Reproduktion - Pressemitteilungen sind explizit Drafts, nicht Auto-Versand - LLM-Calls rate-limited, auth-only **Tests:** 43 neue Tests (19 news_aggregator + 16 themen_matching + 8 presse_generator). Suite jetzt 1048 grün. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 12:39:36 +02:00
"""News-Aggregator fuer das Aktuelle-Themen-Dashboard (#170 Phase 1).
Fetcht regelmaessig News-Headlines aus AI-erlaubenden, oeffentlich-rechtlichen
oder parlamentarischen Quellen:
- **Tagesschau-API** (https://www.tagesschau.de/api2u/news/) strukturiertes
JSON mit ressort, tags, firstSentence pro Artikel.
- **Bundestag-Aktuellethemen-RSS**
(https://www.bundestag.de/static/appdata/includes/rss/aktuellethemen.rss)
RSS mit Titel + Beschreibung pro Artikel.
**Bewusst NICHT verwendet:** RND.de (robots.txt bannt explizit ClaudeBot,
GPTBot, ChatGPT-User, CCBot, Google-Extended). RSS-Feeds privat-publizierter
Verlage werden nur dann angebunden, wenn AI-Verarbeitung explizit erlaubt ist.
**Compliance:**
- Volltexte werden NICHT persistiert. Nur Titel + erster Satz / Description.
- Kein User-Agent, der einen AI-Bot vortaeuscht (kein "ClaudeBot").
- Rate-Limiting: 1 Request pro Quelle pro Aufruf (kein Loop, kein Hammer).
Datenbank-Tabelle ``news_articles`` (siehe app/database.py):
url PK, titel, summary, datum (ISO), source, ressort, tags JSON,
summary_embedding BLOB, embedding_model, fetched_at.
"""
from __future__ import annotations
import json
import logging
import re
import urllib.error
import urllib.request
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
USER_AGENT = "GWOeAntragspruefer/1.0 (+https://gwoe.toppyr.de)"
TIMEOUT = 20
# ─────────────────────────────────────────────────────────────────────────────
# Quellen
# ─────────────────────────────────────────────────────────────────────────────
TAGESSCHAU_API = "https://www.tagesschau.de/api2u/news"
# Politische Tagesschau-Ressorts — Sport/Panorama/Sport rausgefiltert,
# weil sie selten zu parlamentarischen Antraegen passen.
TAGESSCHAU_RESSORTS = ["inland", "ausland", "wirtschaft", "wissen"]
BUNDESTAG_RSS = {
"bundestag-aktuell": (
"https://www.bundestag.de/static/appdata/includes/rss/aktuellethemen.rss"
),
"bundestag-presse": (
"https://www.bundestag.de/static/appdata/includes/rss/pressemitteilungen.rss"
),
"bundestag-hib": (
"https://www.bundestag.de/static/appdata/includes/rss/hib.rss"
),
}
# ─────────────────────────────────────────────────────────────────────────────
# HTTP-Helper
# ─────────────────────────────────────────────────────────────────────────────
def _http_get(url: str) -> Optional[bytes]:
"""GET mit ehrlichem User-Agent + Timeout. Gibt None bei Fehler."""
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
try:
with urllib.request.urlopen(req, timeout=TIMEOUT) as r:
return r.read()
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as e:
logger.warning("news fetch failed: %s%s", url, e)
return None
def _strip_html(text: str) -> str:
"""Entfernt HTML-Tags + CDATA fuer Plaintext-Summaries."""
if not text:
return ""
text = re.sub(r"<!\[CDATA\[(.*?)\]\]>", r"\1", text, flags=re.DOTALL)
text = re.sub(r"<[^>]+>", " ", text)
text = text.replace("&amp;", "&").replace("&nbsp;", " ").replace("&quot;", '"')
return re.sub(r"\s+", " ", text).strip()
# ─────────────────────────────────────────────────────────────────────────────
# Parser
# ─────────────────────────────────────────────────────────────────────────────
def fetch_tagesschau(ressorts: Optional[list[str]] = None) -> list[dict]:
"""Holt News aus der Tagesschau-API. Liefert Liste von Dicts mit den
Feldern: url, titel, summary, datum, source, ressort, tags.
Volltexte (``content``) werden bewusst nicht uebernommen nur die in
der API verfuegbare ``firstSentence`` als Summary.
"""
ressorts = ressorts or TAGESSCHAU_RESSORTS
out: list[dict] = []
seen: set[str] = set()
for ressort in ressorts:
url = f"{TAGESSCHAU_API}?ressort={ressort}"
raw = _http_get(url)
if not raw:
continue
try:
data = json.loads(raw.decode("utf-8"))
except json.JSONDecodeError:
logger.warning("tagesschau JSON parse failed: %s", url)
continue
for item in data.get("news") or []:
link = item.get("shareURL") or item.get("detailsweb")
if not link or link in seen:
continue
seen.add(link)
titel = (item.get("title") or "").strip()
if not titel:
continue
summary = (item.get("firstSentence") or "").strip()
datum = item.get("date") or ""
tags = [t.get("tag") for t in (item.get("tags") or []) if t.get("tag")]
out.append({
"url": link,
"titel": titel,
"summary": summary,
"datum": datum,
"source": "tagesschau",
"ressort": item.get("ressort") or ressort,
"tags": tags,
})
return out
_RSS_ITEM_RE = re.compile(r"<item>(.*?)</item>", re.DOTALL)
_RSS_TITLE_RE = re.compile(r"<title>(.*?)</title>", re.DOTALL)
_RSS_LINK_RE = re.compile(r"<link>(.*?)</link>")
_RSS_DESC_RE = re.compile(r"<description>(.*?)</description>", re.DOTALL)
_RSS_PUB_RE = re.compile(r"<pubDate>(.*?)</pubDate>")
def _parse_rss_date(s: str) -> str:
"""Konvertiere RSS-pubDate (RFC 822) → ISO-8601-Datum."""
if not s:
return ""
try:
dt = parsedate_to_datetime(s.strip())
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat()
except (TypeError, ValueError):
return ""
def fetch_rss(source: str, url: str, max_items: int = 50) -> list[dict]:
"""Generischer RSS-2.0-Parser. Liefert dicts wie fetch_tagesschau."""
raw = _http_get(url)
if not raw:
return []
text = raw.decode("utf-8", errors="replace")
items_xml = _RSS_ITEM_RE.findall(text)[:max_items]
out: list[dict] = []
for item in items_xml:
title_m = _RSS_TITLE_RE.search(item)
link_m = _RSS_LINK_RE.search(item)
desc_m = _RSS_DESC_RE.search(item)
pub_m = _RSS_PUB_RE.search(item)
titel = _strip_html(title_m.group(1)) if title_m else ""
link = _strip_html(link_m.group(1)) if link_m else ""
if not titel or not link:
continue
summary = _strip_html(desc_m.group(1)) if desc_m else ""
datum = _parse_rss_date(pub_m.group(1)) if pub_m else ""
out.append({
"url": link,
"titel": titel,
"summary": summary,
"datum": datum,
"source": source,
"ressort": None,
"tags": [],
})
return out
def fetch_all() -> list[dict]:
"""Holt alle konfigurierten Quellen ein. Kein Caching, kein Auto-Retry."""
out: list[dict] = []
out.extend(fetch_tagesschau())
for source, url in BUNDESTAG_RSS.items():
out.extend(fetch_rss(source, url))
return out
# ─────────────────────────────────────────────────────────────────────────────
# DB-Persistierung
# ─────────────────────────────────────────────────────────────────────────────
def upsert_articles(
articles: list[dict],
db_path: Optional[Path] = None,
embed: bool = True,
) -> dict:
"""Schreibe oder aktualisiere News-Artikel in der DB.
Idempotent ueber URL-PK. Existierende Eintraege bekommen ein neues
``fetched_at``, aber Embedding bleibt persistent (sonst LLM-Kosten
pro Cron-Lauf).
Returns:
``{"inserted": int, "updated": int, "embedded": int}``
"""
import sqlite3
from .config import settings
path = db_path or settings.db_path
if not Path(path).exists():
return {"inserted": 0, "updated": 0, "embedded": 0}
conn = sqlite3.connect(str(path))
inserted = 0
updated = 0
embedded = 0
try:
for art in articles:
url = art["url"]
cur = conn.execute(
"SELECT summary_embedding IS NOT NULL FROM news_articles WHERE url=?",
(url,),
)
row = cur.fetchone()
tags_json = json.dumps(art.get("tags") or [])
if row is None:
conn.execute(
"""INSERT INTO news_articles
(url, titel, summary, datum, source, ressort, tags, fetched_at)
VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))""",
(
url, art["titel"], art.get("summary") or "",
art.get("datum") or "",
art["source"], art.get("ressort"), tags_json,
),
)
inserted += 1
else:
conn.execute(
"""UPDATE news_articles
SET titel=?, summary=?, datum=?, source=?, ressort=?, tags=?,
fetched_at=datetime('now')
WHERE url=?""",
(
art["titel"], art.get("summary") or "",
art.get("datum") or "",
art["source"], art.get("ressort"), tags_json,
url,
),
)
updated += 1
conn.commit()
finally:
conn.close()
if embed:
embedded = embed_pending_articles(db_path=db_path)
return {"inserted": inserted, "updated": updated, "embedded": embedded}
def embed_pending_articles(
db_path: Optional[Path] = None,
limit: int = 100,
) -> int:
"""Erzeuge Embeddings fuer alle News-Artikel ohne ``summary_embedding``.
Embedded wird ein Stueck-Text aus Titel + Summary + Tags. Bei
Embedding-API-Fehler wird der Artikel uebersprungen naechster Run
holt ihn nach.
"""
import sqlite3
from .config import settings
from . import embeddings as emb
path = db_path or settings.db_path
if not Path(path).exists():
return 0
conn = sqlite3.connect(str(path))
try:
rows = conn.execute(
"""SELECT url, titel, summary, tags FROM news_articles
WHERE summary_embedding IS NULL ORDER BY datum DESC LIMIT ?""",
(limit,),
).fetchall()
finally:
conn.close()
if not rows:
return 0
embedded = 0
conn = sqlite3.connect(str(path))
try:
for url, titel, summary, tags_raw in rows:
try:
tags = json.loads(tags_raw) if tags_raw else []
except (json.JSONDecodeError, TypeError):
tags = []
parts = [titel or ""]
if summary:
parts.append(summary)
if tags:
parts.append(", ".join(tags))
text = "\n".join(p for p in parts if p).strip()
if not text:
continue
try:
vec = emb.create_embedding(text, model=emb.EMBEDDING_MODEL)
except Exception:
logger.exception("embed_pending_articles: API error for %s", url)
continue
conn.execute(
"""UPDATE news_articles
SET summary_embedding=?, embedding_model=?
WHERE url=?""",
(json.dumps(vec).encode(), emb.EMBEDDING_MODEL, url),
)
embedded += 1
conn.commit()
finally:
conn.close()
return embedded
def run_aggregator(db_path: Optional[Path] = None, embed: bool = True) -> dict:
"""Top-Level: alle Quellen holen + persistieren + embedden.
Sicher fuer Cron-Aufrufe fehlende Quellen werden geloggt, nicht
geworfen.
"""
articles = fetch_all()
feat: Stand-Dashboard, Score-Histogram, PM-Markdown, Live-Polling, Cluster-Indicator Sechs zusammengehoerige UX/Performance-Erweiterungen: **1. /v2/admin/stand — System-Stand-Dashboard** KPI-Kacheln (Bewertungen, Plenum-Votes, Match, Vote-Orphans, News, PM- Drafts, Bookmarks) + GWÖ-Score-Histogram + Per-BL-Tabelle + News-Source- Tabelle. Auto-Refresh 30 s. Endpoint /api/admin/stand liefert alles in einem Roundtrip. Nav-Eintrag "Stand" in der Admin-Sektion. **2. /auswertungen Score-Histogram-Tab** 4. Tab "Score-Verteilung" mit Bar-Chart 0–10. Endpoint /api/auswertungen/score-histogram liefert Buckets, optional gefiltert nach Bundesland + Wahlperiode. Reagiert auf den globalen BL-Filter. **3. PM-Body Markdown-Rendering** Mini-Renderer im Modal: **bold** / __bold__ / *italic* / _italic_ / - list-bullets / Doppel-Newline-Paragraphen. Kein externer Markdown- Parser, keine neue Dependency. Body wird HTML-escaped, Patterns dann zu Tags umgesetzt. **4. Performance-Cache fuer themen_matching** TTL-Cache (60 s) fuer aggregate_top_themen und aggregate_news_cluster. Cache-Key inkl. aller Filter-Parameter. Automatische Invalidation in news_aggregator.run_aggregator nach erfolgreichem Insert/Embed. 4 neue Tests fuer cache_get/set/clear-Verhalten. **5. Stimmverhalten Banner Live-Update** Statt setTimeout(800) jetzt pollQueueUntilDrained: alle 4 s GET /api/queue/status, Banner zeigt pending + elapsed live. Bei pending=0 zwei Polls in Folge: Banner + Stimmverhalten-Charts neu laden. Max 5 Min Polling-Timeout. Bricht ab wenn Tab gewechselt wird. **6. Antrag-Detail Cluster-Indicator** News-Match-Box im Antrag-Detail laedt parallel /aktuelle-themen/cluster und mappt URL → Cluster. Pro News-Card ein "🔗 Cluster (N News)"-Badge mit Hover-Tooltip der anderen Cluster-Members. Macht thematische Bündel sichtbar, ohne Pop-Out auf den Cluster-Tab. Suite: 1088 → 1092 grün (4 Cache-Tests). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 02:49:06 +02:00
result = upsert_articles(articles, db_path=db_path, embed=embed)
# Cache invalidieren, damit das Dashboard die neuen News sofort zeigt.
if result.get("inserted", 0) > 0 or result.get("embedded", 0) > 0:
try:
from . import themen_matching
themen_matching.cache_clear()
except Exception:
logger.exception("themen_matching cache_clear failed")
return result