"""News-Aggregator fuer das Aktuelle-Themen-Dashboard (#170 Phase 1). Fetcht regelmaessig News-Headlines aus AI-erlaubenden, oeffentlich-rechtlichen oder parlamentarischen Quellen: - **Tagesschau-API** (https://www.tagesschau.de/api2u/news/) — strukturiertes JSON mit ressort, tags, firstSentence pro Artikel. - **Bundestag-Aktuellethemen-RSS** (https://www.bundestag.de/static/appdata/includes/rss/aktuellethemen.rss) — RSS mit Titel + Beschreibung pro Artikel. **Bewusst NICHT verwendet:** RND.de (robots.txt bannt explizit ClaudeBot, GPTBot, ChatGPT-User, CCBot, Google-Extended). RSS-Feeds privat-publizierter Verlage werden nur dann angebunden, wenn AI-Verarbeitung explizit erlaubt ist. **Compliance:** - Volltexte werden NICHT persistiert. Nur Titel + erster Satz / Description. - Kein User-Agent, der einen AI-Bot vortaeuscht (kein "ClaudeBot"). - Rate-Limiting: 1 Request pro Quelle pro Aufruf (kein Loop, kein Hammer). Datenbank-Tabelle ``news_articles`` (siehe app/database.py): url PK, titel, summary, datum (ISO), source, ressort, tags JSON, summary_embedding BLOB, embedding_model, fetched_at. """ from __future__ import annotations import json import logging import re import urllib.error import urllib.request from datetime import datetime, timezone from email.utils import parsedate_to_datetime from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) USER_AGENT = "GWOeAntragspruefer/1.0 (+https://gwoe.toppyr.de)" TIMEOUT = 20 # ───────────────────────────────────────────────────────────────────────────── # Quellen # ───────────────────────────────────────────────────────────────────────────── TAGESSCHAU_API = "https://www.tagesschau.de/api2u/news" # Politische Tagesschau-Ressorts — Sport/Panorama/Sport rausgefiltert, # weil sie selten zu parlamentarischen Antraegen passen. TAGESSCHAU_RESSORTS = ["inland", "ausland", "wirtschaft", "wissen"] BUNDESTAG_RSS = { "bundestag-aktuell": ( "https://www.bundestag.de/static/appdata/includes/rss/aktuellethemen.rss" ), "bundestag-presse": ( "https://www.bundestag.de/static/appdata/includes/rss/pressemitteilungen.rss" ), "bundestag-hib": ( "https://www.bundestag.de/static/appdata/includes/rss/hib.rss" ), } # ───────────────────────────────────────────────────────────────────────────── # HTTP-Helper # ───────────────────────────────────────────────────────────────────────────── def _http_get(url: str) -> Optional[bytes]: """GET mit ehrlichem User-Agent + Timeout. Gibt None bei Fehler.""" req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) try: with urllib.request.urlopen(req, timeout=TIMEOUT) as r: return r.read() except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as e: logger.warning("news fetch failed: %s — %s", url, e) return None def _strip_html(text: str) -> str: """Entfernt HTML-Tags + CDATA fuer Plaintext-Summaries.""" if not text: return "" text = re.sub(r"", r"\1", text, flags=re.DOTALL) text = re.sub(r"<[^>]+>", " ", text) text = text.replace("&", "&").replace(" ", " ").replace(""", '"') return re.sub(r"\s+", " ", text).strip() # ───────────────────────────────────────────────────────────────────────────── # Parser # ───────────────────────────────────────────────────────────────────────────── def fetch_tagesschau(ressorts: Optional[list[str]] = None) -> list[dict]: """Holt News aus der Tagesschau-API. Liefert Liste von Dicts mit den Feldern: url, titel, summary, datum, source, ressort, tags. Volltexte (``content``) werden bewusst nicht uebernommen — nur die in der API verfuegbare ``firstSentence`` als Summary. """ ressorts = ressorts or TAGESSCHAU_RESSORTS out: list[dict] = [] seen: set[str] = set() for ressort in ressorts: url = f"{TAGESSCHAU_API}?ressort={ressort}" raw = _http_get(url) if not raw: continue try: data = json.loads(raw.decode("utf-8")) except json.JSONDecodeError: logger.warning("tagesschau JSON parse failed: %s", url) continue for item in data.get("news") or []: link = item.get("shareURL") or item.get("detailsweb") if not link or link in seen: continue seen.add(link) titel = (item.get("title") or "").strip() if not titel: continue summary = (item.get("firstSentence") or "").strip() datum = item.get("date") or "" tags = [t.get("tag") for t in (item.get("tags") or []) if t.get("tag")] out.append({ "url": link, "titel": titel, "summary": summary, "datum": datum, "source": "tagesschau", "ressort": item.get("ressort") or ressort, "tags": tags, }) return out _RSS_ITEM_RE = re.compile(r"(.*?)", re.DOTALL) _RSS_TITLE_RE = re.compile(r"(.*?)", re.DOTALL) _RSS_LINK_RE = re.compile(r"(.*?)") _RSS_DESC_RE = re.compile(r"(.*?)", re.DOTALL) _RSS_PUB_RE = re.compile(r"(.*?)") def _parse_rss_date(s: str) -> str: """Konvertiere RSS-pubDate (RFC 822) → ISO-8601-Datum.""" if not s: return "" try: dt = parsedate_to_datetime(s.strip()) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc).isoformat() except (TypeError, ValueError): return "" def fetch_rss(source: str, url: str, max_items: int = 50) -> list[dict]: """Generischer RSS-2.0-Parser. Liefert dicts wie fetch_tagesschau.""" raw = _http_get(url) if not raw: return [] text = raw.decode("utf-8", errors="replace") items_xml = _RSS_ITEM_RE.findall(text)[:max_items] out: list[dict] = [] for item in items_xml: title_m = _RSS_TITLE_RE.search(item) link_m = _RSS_LINK_RE.search(item) desc_m = _RSS_DESC_RE.search(item) pub_m = _RSS_PUB_RE.search(item) titel = _strip_html(title_m.group(1)) if title_m else "" link = _strip_html(link_m.group(1)) if link_m else "" if not titel or not link: continue summary = _strip_html(desc_m.group(1)) if desc_m else "" datum = _parse_rss_date(pub_m.group(1)) if pub_m else "" out.append({ "url": link, "titel": titel, "summary": summary, "datum": datum, "source": source, "ressort": None, "tags": [], }) return out def fetch_all() -> list[dict]: """Holt alle konfigurierten Quellen ein. Kein Caching, kein Auto-Retry.""" out: list[dict] = [] out.extend(fetch_tagesschau()) for source, url in BUNDESTAG_RSS.items(): out.extend(fetch_rss(source, url)) return out # ───────────────────────────────────────────────────────────────────────────── # DB-Persistierung # ───────────────────────────────────────────────────────────────────────────── def upsert_articles( articles: list[dict], db_path: Optional[Path] = None, embed: bool = True, ) -> dict: """Schreibe oder aktualisiere News-Artikel in der DB. Idempotent ueber URL-PK. Existierende Eintraege bekommen ein neues ``fetched_at``, aber Embedding bleibt persistent (sonst LLM-Kosten pro Cron-Lauf). Returns: ``{"inserted": int, "updated": int, "embedded": int}`` """ import sqlite3 from .config import settings path = db_path or settings.db_path if not Path(path).exists(): return {"inserted": 0, "updated": 0, "embedded": 0} conn = sqlite3.connect(str(path)) inserted = 0 updated = 0 embedded = 0 try: for art in articles: url = art["url"] cur = conn.execute( "SELECT summary_embedding IS NOT NULL FROM news_articles WHERE url=?", (url,), ) row = cur.fetchone() tags_json = json.dumps(art.get("tags") or []) if row is None: conn.execute( """INSERT INTO news_articles (url, titel, summary, datum, source, ressort, tags, fetched_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))""", ( url, art["titel"], art.get("summary") or "", art.get("datum") or "", art["source"], art.get("ressort"), tags_json, ), ) inserted += 1 else: conn.execute( """UPDATE news_articles SET titel=?, summary=?, datum=?, source=?, ressort=?, tags=?, fetched_at=datetime('now') WHERE url=?""", ( art["titel"], art.get("summary") or "", art.get("datum") or "", art["source"], art.get("ressort"), tags_json, url, ), ) updated += 1 conn.commit() finally: conn.close() if embed: embedded = embed_pending_articles(db_path=db_path) return {"inserted": inserted, "updated": updated, "embedded": embedded} def embed_pending_articles( db_path: Optional[Path] = None, limit: int = 100, ) -> int: """Erzeuge Embeddings fuer alle News-Artikel ohne ``summary_embedding``. Embedded wird ein Stueck-Text aus Titel + Summary + Tags. Bei Embedding-API-Fehler wird der Artikel uebersprungen — naechster Run holt ihn nach. """ import sqlite3 from .config import settings from . import embeddings as emb path = db_path or settings.db_path if not Path(path).exists(): return 0 conn = sqlite3.connect(str(path)) try: rows = conn.execute( """SELECT url, titel, summary, tags FROM news_articles WHERE summary_embedding IS NULL ORDER BY datum DESC LIMIT ?""", (limit,), ).fetchall() finally: conn.close() if not rows: return 0 embedded = 0 conn = sqlite3.connect(str(path)) try: for url, titel, summary, tags_raw in rows: try: tags = json.loads(tags_raw) if tags_raw else [] except (json.JSONDecodeError, TypeError): tags = [] parts = [titel or ""] if summary: parts.append(summary) if tags: parts.append(", ".join(tags)) text = "\n".join(p for p in parts if p).strip() if not text: continue try: vec = emb.create_embedding(text, model=emb.EMBEDDING_MODEL) except Exception: logger.exception("embed_pending_articles: API error for %s", url) continue conn.execute( """UPDATE news_articles SET summary_embedding=?, embedding_model=? WHERE url=?""", (json.dumps(vec).encode(), emb.EMBEDDING_MODEL, url), ) embedded += 1 conn.commit() finally: conn.close() return embedded def run_aggregator(db_path: Optional[Path] = None, embed: bool = True) -> dict: """Top-Level: alle Quellen holen + persistieren + embedden. Sicher fuer Cron-Aufrufe — fehlende Quellen werden geloggt, nicht geworfen. """ articles = fetch_all() result = upsert_articles(articles, db_path=db_path, embed=embed) # Cache invalidieren, damit das Dashboard die neuen News sofort zeigt. if result.get("inserted", 0) > 0 or result.get("embedded", 0) > 0: try: from . import themen_matching themen_matching.cache_clear() except Exception: logger.exception("themen_matching cache_clear failed") return result