Vollständiges 4-Phasen-Feature:
**Phase 1 — News-Aggregator** (`app/news_aggregator.py`)
- Tagesschau-API (`/api2u/news?ressort=...`) für inland/ausland/wirtschaft/wissen
- Bundestag-RSS für aktuellethemen / pressemitteilungen / hib
- DB-Tabelle `news_articles` (URL-PK, idempotent)
- Embeddings via existierender qwen-v4-Pipeline
- Cron-Script `scripts/auto-fetch-news.sh`
- Bewusst NICHT: RND.de (robots.txt bannt explizit ClaudeBot, GPTBot,
CCBot, ChatGPT-User, Google-Extended). Nur AI-erlaubende, öffentlich-
rechtliche/parlamentarische Quellen
- Volltexte werden NICHT persistiert (nur Titel + erster Satz)
**Phase 2 — Themen × Anträge Matching** (`app/themen_matching.py`)
- News-Embedding × Assessment-summary_embedding via Cosine-Similarity
- `find_anträge_for_news`: pro News die Top-K passenden Anträge
- `find_news_for_antrag`: pro Antrag Top-K News mit Datums-Fenster (90d)
- `aggregate_top_themen`: primärer Dashboard-Endpoint
- `aggregate_themen_zeitreihe`: News-Volumen pro Tag × Source
**Phase 3 — Dashboard-View** (`/aktuelle-themen`)
- Neuer linker Nav-Eintrag „Aktuelle Themen"
- Stacked-Area-Chart News-Volumen pro Quelle (30d)
- Pro News-Card: Titel + Summary + Tags + Top-3-Antrags-Match-Liste
mit GWÖ-Score-Pill, Drucksache-Link, PM-Vorschlag-Button
- Filter: Zeitfenster, Top-N, min_similarity
- Auth-protected (require_auth)
**Phase 4 — Pressemitteilungs-Generator** (`app/presse_generator.py`)
- LLM-Prompt-Template (200-250 Worte, GWÖ-Sicht, JSON-Output)
- Reuse von `QwenBewerter` aus app/adapters/qwen_bewerter.py
- DB-Tabelle `presse_drafts` (Persistenz)
- POST `/api/aktuelle-themen/generate-presse` rate-limited 5/min,
auth-only (LLM-Kosten)
- GET `/api/aktuelle-themen/drafts` + `/drafts/{id}` für Liste/Detail
- Manueller Trigger via UI-Button, kein Auto-Versand
- Modal-Anzeige des generierten Texts
**Compliance:**
- robots.txt-respektierend (ClaudeBot-Bann von RND vermieden, AI-
erlaubende Quellen verwendet)
- UI zeigt nur Titel+URL+Datum+erster Satz, keine Volltext-Reproduktion
- Pressemitteilungen sind explizit Drafts, nicht Auto-Versand
- LLM-Calls rate-limited, auth-only
**Tests:** 43 neue Tests (19 news_aggregator + 16 themen_matching +
8 presse_generator). Suite jetzt 1048 grün.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
348 lines
13 KiB
Python
348 lines
13 KiB
Python
"""News-Aggregator fuer das Aktuelle-Themen-Dashboard (#170 Phase 1).
|
|
|
|
Fetcht regelmaessig News-Headlines aus AI-erlaubenden, oeffentlich-rechtlichen
|
|
oder parlamentarischen Quellen:
|
|
|
|
- **Tagesschau-API** (https://www.tagesschau.de/api2u/news/) — strukturiertes
|
|
JSON mit ressort, tags, firstSentence pro Artikel.
|
|
- **Bundestag-Aktuellethemen-RSS**
|
|
(https://www.bundestag.de/static/appdata/includes/rss/aktuellethemen.rss)
|
|
— RSS mit Titel + Beschreibung pro Artikel.
|
|
|
|
**Bewusst NICHT verwendet:** RND.de (robots.txt bannt explizit ClaudeBot,
|
|
GPTBot, ChatGPT-User, CCBot, Google-Extended). RSS-Feeds privat-publizierter
|
|
Verlage werden nur dann angebunden, wenn AI-Verarbeitung explizit erlaubt ist.
|
|
|
|
**Compliance:**
|
|
- Volltexte werden NICHT persistiert. Nur Titel + erster Satz / Description.
|
|
- Kein User-Agent, der einen AI-Bot vortaeuscht (kein "ClaudeBot").
|
|
- Rate-Limiting: 1 Request pro Quelle pro Aufruf (kein Loop, kein Hammer).
|
|
|
|
Datenbank-Tabelle ``news_articles`` (siehe app/database.py):
|
|
url PK, titel, summary, datum (ISO), source, ressort, tags JSON,
|
|
summary_embedding BLOB, embedding_model, fetched_at.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import urllib.error
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from email.utils import parsedate_to_datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
USER_AGENT = "GWOeAntragspruefer/1.0 (+https://gwoe.toppyr.de)"
|
|
TIMEOUT = 20
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Quellen
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
TAGESSCHAU_API = "https://www.tagesschau.de/api2u/news"
|
|
|
|
# Politische Tagesschau-Ressorts — Sport/Panorama/Sport rausgefiltert,
|
|
# weil sie selten zu parlamentarischen Antraegen passen.
|
|
TAGESSCHAU_RESSORTS = ["inland", "ausland", "wirtschaft", "wissen"]
|
|
|
|
BUNDESTAG_RSS = {
|
|
"bundestag-aktuell": (
|
|
"https://www.bundestag.de/static/appdata/includes/rss/aktuellethemen.rss"
|
|
),
|
|
"bundestag-presse": (
|
|
"https://www.bundestag.de/static/appdata/includes/rss/pressemitteilungen.rss"
|
|
),
|
|
"bundestag-hib": (
|
|
"https://www.bundestag.de/static/appdata/includes/rss/hib.rss"
|
|
),
|
|
}
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# HTTP-Helper
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _http_get(url: str) -> Optional[bytes]:
|
|
"""GET mit ehrlichem User-Agent + Timeout. Gibt None bei Fehler."""
|
|
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=TIMEOUT) as r:
|
|
return r.read()
|
|
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as e:
|
|
logger.warning("news fetch failed: %s — %s", url, e)
|
|
return None
|
|
|
|
|
|
def _strip_html(text: str) -> str:
|
|
"""Entfernt HTML-Tags + CDATA fuer Plaintext-Summaries."""
|
|
if not text:
|
|
return ""
|
|
text = re.sub(r"<!\[CDATA\[(.*?)\]\]>", r"\1", text, flags=re.DOTALL)
|
|
text = re.sub(r"<[^>]+>", " ", text)
|
|
text = text.replace("&", "&").replace(" ", " ").replace(""", '"')
|
|
return re.sub(r"\s+", " ", text).strip()
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Parser
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def fetch_tagesschau(ressorts: Optional[list[str]] = None) -> list[dict]:
|
|
"""Holt News aus der Tagesschau-API. Liefert Liste von Dicts mit den
|
|
Feldern: url, titel, summary, datum, source, ressort, tags.
|
|
|
|
Volltexte (``content``) werden bewusst nicht uebernommen — nur die in
|
|
der API verfuegbare ``firstSentence`` als Summary.
|
|
"""
|
|
ressorts = ressorts or TAGESSCHAU_RESSORTS
|
|
out: list[dict] = []
|
|
seen: set[str] = set()
|
|
for ressort in ressorts:
|
|
url = f"{TAGESSCHAU_API}?ressort={ressort}"
|
|
raw = _http_get(url)
|
|
if not raw:
|
|
continue
|
|
try:
|
|
data = json.loads(raw.decode("utf-8"))
|
|
except json.JSONDecodeError:
|
|
logger.warning("tagesschau JSON parse failed: %s", url)
|
|
continue
|
|
for item in data.get("news") or []:
|
|
link = item.get("shareURL") or item.get("detailsweb")
|
|
if not link or link in seen:
|
|
continue
|
|
seen.add(link)
|
|
titel = (item.get("title") or "").strip()
|
|
if not titel:
|
|
continue
|
|
summary = (item.get("firstSentence") or "").strip()
|
|
datum = item.get("date") or ""
|
|
tags = [t.get("tag") for t in (item.get("tags") or []) if t.get("tag")]
|
|
out.append({
|
|
"url": link,
|
|
"titel": titel,
|
|
"summary": summary,
|
|
"datum": datum,
|
|
"source": "tagesschau",
|
|
"ressort": item.get("ressort") or ressort,
|
|
"tags": tags,
|
|
})
|
|
return out
|
|
|
|
|
|
_RSS_ITEM_RE = re.compile(r"<item>(.*?)</item>", re.DOTALL)
|
|
_RSS_TITLE_RE = re.compile(r"<title>(.*?)</title>", re.DOTALL)
|
|
_RSS_LINK_RE = re.compile(r"<link>(.*?)</link>")
|
|
_RSS_DESC_RE = re.compile(r"<description>(.*?)</description>", re.DOTALL)
|
|
_RSS_PUB_RE = re.compile(r"<pubDate>(.*?)</pubDate>")
|
|
|
|
|
|
def _parse_rss_date(s: str) -> str:
|
|
"""Konvertiere RSS-pubDate (RFC 822) → ISO-8601-Datum."""
|
|
if not s:
|
|
return ""
|
|
try:
|
|
dt = parsedate_to_datetime(s.strip())
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt.astimezone(timezone.utc).isoformat()
|
|
except (TypeError, ValueError):
|
|
return ""
|
|
|
|
|
|
def fetch_rss(source: str, url: str, max_items: int = 50) -> list[dict]:
|
|
"""Generischer RSS-2.0-Parser. Liefert dicts wie fetch_tagesschau."""
|
|
raw = _http_get(url)
|
|
if not raw:
|
|
return []
|
|
text = raw.decode("utf-8", errors="replace")
|
|
items_xml = _RSS_ITEM_RE.findall(text)[:max_items]
|
|
out: list[dict] = []
|
|
for item in items_xml:
|
|
title_m = _RSS_TITLE_RE.search(item)
|
|
link_m = _RSS_LINK_RE.search(item)
|
|
desc_m = _RSS_DESC_RE.search(item)
|
|
pub_m = _RSS_PUB_RE.search(item)
|
|
titel = _strip_html(title_m.group(1)) if title_m else ""
|
|
link = _strip_html(link_m.group(1)) if link_m else ""
|
|
if not titel or not link:
|
|
continue
|
|
summary = _strip_html(desc_m.group(1)) if desc_m else ""
|
|
datum = _parse_rss_date(pub_m.group(1)) if pub_m else ""
|
|
out.append({
|
|
"url": link,
|
|
"titel": titel,
|
|
"summary": summary,
|
|
"datum": datum,
|
|
"source": source,
|
|
"ressort": None,
|
|
"tags": [],
|
|
})
|
|
return out
|
|
|
|
|
|
def fetch_all() -> list[dict]:
|
|
"""Holt alle konfigurierten Quellen ein. Kein Caching, kein Auto-Retry."""
|
|
out: list[dict] = []
|
|
out.extend(fetch_tagesschau())
|
|
for source, url in BUNDESTAG_RSS.items():
|
|
out.extend(fetch_rss(source, url))
|
|
return out
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# DB-Persistierung
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def upsert_articles(
|
|
articles: list[dict],
|
|
db_path: Optional[Path] = None,
|
|
embed: bool = True,
|
|
) -> dict:
|
|
"""Schreibe oder aktualisiere News-Artikel in der DB.
|
|
|
|
Idempotent ueber URL-PK. Existierende Eintraege bekommen ein neues
|
|
``fetched_at``, aber Embedding bleibt persistent (sonst LLM-Kosten
|
|
pro Cron-Lauf).
|
|
|
|
Returns:
|
|
``{"inserted": int, "updated": int, "embedded": int}``
|
|
"""
|
|
import sqlite3
|
|
from .config import settings
|
|
|
|
path = db_path or settings.db_path
|
|
if not Path(path).exists():
|
|
return {"inserted": 0, "updated": 0, "embedded": 0}
|
|
|
|
conn = sqlite3.connect(str(path))
|
|
inserted = 0
|
|
updated = 0
|
|
embedded = 0
|
|
try:
|
|
for art in articles:
|
|
url = art["url"]
|
|
cur = conn.execute(
|
|
"SELECT summary_embedding IS NOT NULL FROM news_articles WHERE url=?",
|
|
(url,),
|
|
)
|
|
row = cur.fetchone()
|
|
tags_json = json.dumps(art.get("tags") or [])
|
|
if row is None:
|
|
conn.execute(
|
|
"""INSERT INTO news_articles
|
|
(url, titel, summary, datum, source, ressort, tags, fetched_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))""",
|
|
(
|
|
url, art["titel"], art.get("summary") or "",
|
|
art.get("datum") or "",
|
|
art["source"], art.get("ressort"), tags_json,
|
|
),
|
|
)
|
|
inserted += 1
|
|
else:
|
|
conn.execute(
|
|
"""UPDATE news_articles
|
|
SET titel=?, summary=?, datum=?, source=?, ressort=?, tags=?,
|
|
fetched_at=datetime('now')
|
|
WHERE url=?""",
|
|
(
|
|
art["titel"], art.get("summary") or "",
|
|
art.get("datum") or "",
|
|
art["source"], art.get("ressort"), tags_json,
|
|
url,
|
|
),
|
|
)
|
|
updated += 1
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
if embed:
|
|
embedded = embed_pending_articles(db_path=db_path)
|
|
|
|
return {"inserted": inserted, "updated": updated, "embedded": embedded}
|
|
|
|
|
|
def embed_pending_articles(
|
|
db_path: Optional[Path] = None,
|
|
limit: int = 100,
|
|
) -> int:
|
|
"""Erzeuge Embeddings fuer alle News-Artikel ohne ``summary_embedding``.
|
|
|
|
Embedded wird ein Stueck-Text aus Titel + Summary + Tags. Bei
|
|
Embedding-API-Fehler wird der Artikel uebersprungen — naechster Run
|
|
holt ihn nach.
|
|
"""
|
|
import sqlite3
|
|
from .config import settings
|
|
from . import embeddings as emb
|
|
|
|
path = db_path or settings.db_path
|
|
if not Path(path).exists():
|
|
return 0
|
|
|
|
conn = sqlite3.connect(str(path))
|
|
try:
|
|
rows = conn.execute(
|
|
"""SELECT url, titel, summary, tags FROM news_articles
|
|
WHERE summary_embedding IS NULL ORDER BY datum DESC LIMIT ?""",
|
|
(limit,),
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
|
|
if not rows:
|
|
return 0
|
|
|
|
embedded = 0
|
|
conn = sqlite3.connect(str(path))
|
|
try:
|
|
for url, titel, summary, tags_raw in rows:
|
|
try:
|
|
tags = json.loads(tags_raw) if tags_raw else []
|
|
except (json.JSONDecodeError, TypeError):
|
|
tags = []
|
|
parts = [titel or ""]
|
|
if summary:
|
|
parts.append(summary)
|
|
if tags:
|
|
parts.append(", ".join(tags))
|
|
text = "\n".join(p for p in parts if p).strip()
|
|
if not text:
|
|
continue
|
|
try:
|
|
vec = emb.create_embedding(text, model=emb.EMBEDDING_MODEL)
|
|
except Exception:
|
|
logger.exception("embed_pending_articles: API error for %s", url)
|
|
continue
|
|
conn.execute(
|
|
"""UPDATE news_articles
|
|
SET summary_embedding=?, embedding_model=?
|
|
WHERE url=?""",
|
|
(json.dumps(vec).encode(), emb.EMBEDDING_MODEL, url),
|
|
)
|
|
embedded += 1
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return embedded
|
|
|
|
|
|
def run_aggregator(db_path: Optional[Path] = None, embed: bool = True) -> dict:
|
|
"""Top-Level: alle Quellen holen + persistieren + embedden.
|
|
|
|
Sicher fuer Cron-Aufrufe — fehlende Quellen werden geloggt, nicht
|
|
geworfen.
|
|
"""
|
|
articles = fetch_all()
|
|
return upsert_articles(articles, db_path=db_path, embed=embed)
|