— Daten
{{ icon("chart-bar", 14) }} Auswertungen
+
{{ icon("book-open", 14) }} Aktuelle Themen
{{ icon("file-csv", 14) }} Export · API
{{ icon("rss", 14) }} Atom-Feed
{{ icon("envelope-simple", 14) }} Meine Abos
diff --git a/app/templates/v2/screens/aktuelle-themen.html b/app/templates/v2/screens/aktuelle-themen.html
new file mode 100644
index 0000000..30fd08e
--- /dev/null
+++ b/app/templates/v2/screens/aktuelle-themen.html
@@ -0,0 +1,417 @@
+{% extends "v2/base.html" %}
+
+{% block title %}Aktuelle Themen — GWÖ-Antragsprüfer{% endblock %}
+
+{% set v2_active_nav = "aktuelle-themen" %}
+
+{% block head_extra %}
+
+
+{% endblock %}
+
+{% block main %}
+
+
Aktuelle Themen
+
+ Tagesschau + Bundestag-RSS · gematcht mit deinen Anträgen ·
+ Pressemitteilungs-Vorschläge
+
+
+
+
+
+ Die täglich aktuellen politischen Top-Themen aus
+ öffentlich-rechtlichen + parlamentarischen Quellen
+ (Tagesschau-API + Bundestag-RSS) werden semantisch mit den von dir
+ bewerteten Anträgen verschnitten. Pro News-Artikel siehst du die
+ GWÖ-Bewertung der dazu passendsten Anträge — und kannst per Klick
+ eine Pressemitteilung generieren lassen.
+
+
+ Bewusst nicht verwendet: Quellen mit AI-Bann in
+ robots.txt (z.B. RND.de). Die UI zeigt nur Titel + URL + erste Sätze
+ — Volltexte werden nicht persistiert.
+
+
+
+
+ Zeitfenster:
+
+ 3 Tage
+ 7 Tage
+ 14 Tage
+ 30 Tage
+
+ Top-N News:
+
+ Min. Similarity:
+
+ 0.30 (locker)
+ 0.40 (default)
+ 0.50 (streng)
+
+ Aktualisieren
+
+
+
+
+ News-Volumen pro Quelle (letzte 30 Tage)
+
+
+
+
+
+
+
+
+ Top-Themen × passende Anträge
+
+
+
+
+
+ Pressemitteilungs-Entwürfe (zuletzt generiert)
+
+
+
+
+
+
+
×
+
Pressemitteilung
+
Generiere …
+
+
+
+{% endblock %}
+
+{% block body_scripts %}
+
+{% endblock %}
diff --git a/app/themen_matching.py b/app/themen_matching.py
new file mode 100644
index 0000000..5bb0b6f
--- /dev/null
+++ b/app/themen_matching.py
@@ -0,0 +1,371 @@
+"""Themen × Anträge Matching fuer das Aktuelle-Themen-Dashboard
+(#170 Phase 2).
+
+Verschneidet News-Artikel-Embeddings (aus news_articles.summary_embedding)
+mit Antrag-Embeddings (assessments.summary_embedding) per Cosine-Similarity.
+Liefert pro News-Artikel die Top-K-passendsten Anträge.
+
+Reuse:
+- ``embeddings.cosine_similarity`` fuer den Vektor-Vergleich
+- Beide Tabellen nutzen denselben Embedding-Modell-Vektorraum (qwen v4),
+ daher direkter Cross-Vergleich moeglich
+- Filter ueber ``embedding_model``-Spalte, falls Migration laueft
+"""
+from __future__ import annotations
+
+import json
+import logging
+import sqlite3
+from collections import defaultdict
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Optional
+
+logger = logging.getLogger(__name__)
+
+
+def _load_embeddings(
+ db_path: Path,
+ table: str,
+ select_cols: list[str],
+ where_extra: str = "",
+ params: tuple = (),
+) -> list[dict]:
+ """Generischer Loader fuer Tabellen mit ``summary_embedding``-Spalte.
+
+ Liefert Zeilen mit decoded Embedding-Vektor (oder filtert aus, wenn
+ Modell nicht zum aktuellen READ-Modell passt).
+ """
+ from . import embeddings as emb
+
+ if not Path(db_path).exists():
+ return []
+ conn = sqlite3.connect(str(db_path))
+ try:
+ conn.row_factory = sqlite3.Row
+ cols = ", ".join(select_cols)
+ sql = (
+ f"SELECT {cols}, summary_embedding, embedding_model "
+ f"FROM {table} "
+ f"WHERE summary_embedding IS NOT NULL {where_extra}"
+ )
+ rows = conn.execute(sql, params).fetchall()
+ finally:
+ conn.close()
+
+ out = []
+ for r in rows:
+ if r["embedding_model"] != emb.EMBEDDING_MODEL_READ:
+ continue
+ try:
+ vec = json.loads(r["summary_embedding"])
+ except (json.JSONDecodeError, TypeError):
+ continue
+ d = dict(r)
+ d["_vec"] = vec
+ out.append(d)
+ return out
+
+
+def find_anträge_for_news(
+ news_url: str,
+ top_k: int = 5,
+ min_similarity: float = 0.4,
+ db_path: Optional[Path] = None,
+) -> list[dict]:
+ """Pro gegebener News-URL: Top-K aehnlichste Antraege per Cosine-Match.
+
+ Filter ``min_similarity`` haelt den Cut-Off fuer "passt einigermassen".
+ 0.4 ist empirisch der Punkt, ab dem qwen-v4-Embeddings semantisch
+ relevant matchen.
+ """
+ from .config import settings
+ from . import embeddings as emb
+
+ path = db_path or settings.db_path
+ if not Path(path).exists():
+ return []
+
+ # 1. News-Vektor laden
+ conn = sqlite3.connect(str(path))
+ try:
+ row = conn.execute(
+ """SELECT summary_embedding, embedding_model
+ FROM news_articles WHERE url=?""",
+ (news_url,),
+ ).fetchone()
+ finally:
+ conn.close()
+ if not row or not row[0] or row[1] != emb.EMBEDDING_MODEL_READ:
+ return []
+ try:
+ news_vec = json.loads(row[0])
+ except (json.JSONDecodeError, TypeError):
+ return []
+
+ # 2. Alle Assessments mit Embedding laden + scoren
+ assessments = _load_embeddings(
+ Path(path),
+ "assessments",
+ ["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
+ "empfehlung", "themen", "datum"],
+ )
+ scored = []
+ for a in assessments:
+ sim = emb.cosine_similarity(news_vec, a["_vec"])
+ if sim < min_similarity:
+ continue
+ scored.append({
+ "drucksache": a["drucksache"],
+ "title": a["title"],
+ "bundesland": a["bundesland"],
+ "fraktionen": json.loads(a["fraktionen"] or "[]"),
+ "gwoe_score": a["gwoe_score"],
+ "empfehlung": a["empfehlung"],
+ "themen": json.loads(a["themen"] or "[]"),
+ "datum": a["datum"],
+ "similarity": round(sim, 3),
+ })
+ scored.sort(key=lambda x: x["similarity"], reverse=True)
+ return scored[:top_k]
+
+
+def find_news_for_antrag(
+ drucksache: str,
+ top_k: int = 5,
+ min_similarity: float = 0.4,
+ days_window: int = 90,
+ db_path: Optional[Path] = None,
+) -> list[dict]:
+ """Pro gegebener Drucksache: Top-K aehnlichste News-Artikel per Cosine.
+
+ Filtert News auf ein Zeitfenster (Default 90 Tage), damit
+ Pressemitteilungen aus aktueller Aktualitaet stammen.
+ """
+ from .config import settings
+ from . import embeddings as emb
+
+ path = db_path or settings.db_path
+ if not Path(path).exists():
+ return []
+
+ # 1. Antrag-Vektor laden
+ conn = sqlite3.connect(str(path))
+ try:
+ row = conn.execute(
+ """SELECT summary_embedding, embedding_model
+ FROM assessments WHERE drucksache=?""",
+ (drucksache,),
+ ).fetchone()
+ finally:
+ conn.close()
+ if not row or not row[0] or row[1] != emb.EMBEDDING_MODEL_READ:
+ return []
+ try:
+ antrag_vec = json.loads(row[0])
+ except (json.JSONDecodeError, TypeError):
+ return []
+
+ # 2. News mit Datums-Filter laden
+ cutoff = datetime.now(timezone.utc).timestamp() - days_window * 86400
+ news = _load_embeddings(
+ Path(path),
+ "news_articles",
+ ["url", "titel", "summary", "datum", "source", "ressort", "tags"],
+ )
+ scored = []
+ for n in news:
+ sim = emb.cosine_similarity(antrag_vec, n["_vec"])
+ if sim < min_similarity:
+ continue
+ # Datums-Filter
+ try:
+ news_ts = datetime.fromisoformat(
+ n["datum"].replace("Z", "+00:00")
+ ).timestamp()
+ if news_ts < cutoff:
+ continue
+ except (ValueError, AttributeError):
+ pass # Wenn Datum nicht parsbar, lass es durch
+ try:
+ tags = json.loads(n["tags"]) if n["tags"] else []
+ except (json.JSONDecodeError, TypeError):
+ tags = []
+ scored.append({
+ "url": n["url"],
+ "titel": n["titel"],
+ "summary": n["summary"],
+ "datum": n["datum"],
+ "source": n["source"],
+ "ressort": n["ressort"],
+ "tags": tags,
+ "similarity": round(sim, 3),
+ })
+ scored.sort(key=lambda x: x["similarity"], reverse=True)
+ return scored[:top_k]
+
+
+def aggregate_top_themen(
+ days_window: int = 7,
+ top_k: int = 10,
+ min_similarity: float = 0.4,
+ matches_per_news: int = 3,
+ db_path: Optional[Path] = None,
+) -> dict:
+ """Top-K aktuelle News (letzte N Tage) mit jeweils ihren passendsten
+ Antraegen — der primaere Dashboard-Endpoint.
+
+ Returns:
+ ``{
+ "buckets": [{
+ "news": {url, titel, summary, datum, source, ressort, tags},
+ "matches": [{drucksache, title, gwoe_score, similarity, ...}]
+ }, ...],
+ "n_total_news": int,
+ "filter": {...}
+ }``
+ """
+ from .config import settings
+ from . import embeddings as emb
+
+ path = db_path or settings.db_path
+ if not Path(path).exists():
+ return {"buckets": [], "n_total_news": 0, "filter": {
+ "days_window": days_window, "top_k": top_k,
+ "min_similarity": min_similarity,
+ }}
+
+ cutoff = (
+ datetime.now(timezone.utc).timestamp() - days_window * 86400
+ )
+
+ news_rows = _load_embeddings(
+ Path(path),
+ "news_articles",
+ ["url", "titel", "summary", "datum", "source", "ressort", "tags"],
+ )
+ # Nach Datum filtern
+ fresh = []
+ for n in news_rows:
+ try:
+ news_ts = datetime.fromisoformat(
+ n["datum"].replace("Z", "+00:00")
+ ).timestamp()
+ except (ValueError, AttributeError):
+ continue
+ if news_ts < cutoff:
+ continue
+ n["_ts"] = news_ts
+ fresh.append(n)
+ # Nach Datum desc sortieren, top_k cutten
+ fresh.sort(key=lambda x: x["_ts"], reverse=True)
+ fresh = fresh[:top_k]
+
+ # Pro News: alle Antraege scoren, Top matches_per_news behalten
+ assessments = _load_embeddings(
+ Path(path),
+ "assessments",
+ ["drucksache", "title", "bundesland", "fraktionen", "gwoe_score",
+ "empfehlung", "themen", "datum"],
+ )
+
+ buckets = []
+ for n in fresh:
+ scored = []
+ for a in assessments:
+ sim = emb.cosine_similarity(n["_vec"], a["_vec"])
+ if sim < min_similarity:
+ continue
+ scored.append({
+ "drucksache": a["drucksache"],
+ "title": a["title"],
+ "bundesland": a["bundesland"],
+ "fraktionen": json.loads(a["fraktionen"] or "[]"),
+ "gwoe_score": a["gwoe_score"],
+ "empfehlung": a["empfehlung"],
+ "datum": a["datum"],
+ "similarity": round(sim, 3),
+ })
+ scored.sort(key=lambda x: x["similarity"], reverse=True)
+ try:
+ tags = json.loads(n["tags"]) if n["tags"] else []
+ except (json.JSONDecodeError, TypeError):
+ tags = []
+ buckets.append({
+ "news": {
+ "url": n["url"],
+ "titel": n["titel"],
+ "summary": n["summary"],
+ "datum": n["datum"],
+ "source": n["source"],
+ "ressort": n["ressort"],
+ "tags": tags,
+ },
+ "matches": scored[:matches_per_news],
+ })
+
+ return {
+ "buckets": buckets,
+ "n_total_news": len(news_rows),
+ "filter": {
+ "days_window": days_window,
+ "top_k": top_k,
+ "min_similarity": min_similarity,
+ "matches_per_news": matches_per_news,
+ },
+ }
+
+
+def aggregate_themen_zeitreihe(
+ days_window: int = 30,
+ db_path: Optional[Path] = None,
+) -> dict:
+ """News-Volumen pro (Tag, Source) ueber die letzten N Tage —
+ Stacked-Area-Chart.
+
+ Liefert Zeitreihe ohne Antrag-Match — nur die News-Aktivitaet pro
+ Quelle, damit das Dashboard sehen kann, welche Quellen wie aktiv waren.
+ """
+ from .config import settings
+
+ path = db_path or settings.db_path
+ if not Path(path).exists():
+ return {"buckets": [], "sources": [], "series": {}}
+
+ cutoff_ts = datetime.now(timezone.utc).timestamp() - days_window * 86400
+ conn = sqlite3.connect(str(path))
+ try:
+ rows = conn.execute(
+ "SELECT datum, source FROM news_articles"
+ ).fetchall()
+ finally:
+ conn.close()
+
+ counts: defaultdict[tuple[str, str], int] = defaultdict(int)
+ sources_seen: set[str] = set()
+ days_seen: set[str] = set()
+ for datum, source in rows:
+ if not datum:
+ continue
+ try:
+ ts = datetime.fromisoformat(datum.replace("Z", "+00:00")).timestamp()
+ except (ValueError, AttributeError):
+ continue
+ if ts < cutoff_ts:
+ continue
+ day = datum[:10] # YYYY-MM-DD
+ sources_seen.add(source)
+ days_seen.add(day)
+ counts[(day, source)] += 1
+
+ days_sorted = sorted(days_seen)
+ sources_sorted = sorted(sources_seen)
+ series = {
+ s: [counts[(d, s)] for d in days_sorted]
+ for s in sources_sorted
+ }
+ return {
+ "buckets": days_sorted,
+ "sources": sources_sorted,
+ "series": series,
+ }
diff --git a/scripts/auto-fetch-news.sh b/scripts/auto-fetch-news.sh
new file mode 100755
index 0000000..7c69d9f
--- /dev/null
+++ b/scripts/auto-fetch-news.sh
@@ -0,0 +1,24 @@
+#!/bin/bash
+# Aktuelle-Themen-Dashboard: News-Aggregator-Cron (#170 Phase 1).
+#
+# Holt taeglich Headlines von Tagesschau-API + Bundestag-RSS, persistiert
+# sie in news_articles und embeddet die neuen via Qwen-Embeddings-API.
+# Idempotent (URL-PK), wiederhol-bar bei Fehlern.
+#
+# Wird via Cron taeglich morgens aufgerufen, vor auto-ingest-protocols.sh.
+#
+# Usage:
+# auto-fetch-news.sh [CONTAINER]
+set -euo pipefail
+
+CONTAINER="${1:-gwoe-antragspruefer}"
+
+echo "=== auto-fetch-news $(date -Iseconds) ==="
+
+docker exec -i "$CONTAINER" python <<'EOF'
+from app.news_aggregator import run_aggregator
+stats = run_aggregator()
+print(f"News-Aggregator: inserted={stats['inserted']} updated={stats['updated']} embedded={stats['embedded']}")
+EOF
+
+echo "=== auto-fetch-news done $(date -Iseconds) ==="
diff --git a/tests/test_news_aggregator.py b/tests/test_news_aggregator.py
new file mode 100644
index 0000000..84c41bc
--- /dev/null
+++ b/tests/test_news_aggregator.py
@@ -0,0 +1,262 @@
+"""Tests fuer app.news_aggregator (#170 Phase 1).
+
+Testet Parser + DB-Persistierung gegen kontrollierte Fixtures, ohne
+Live-HTTP-Calls (Tagesschau-API + Bundestag-RSS werden gemockt).
+"""
+from __future__ import annotations
+
+import json
+import sqlite3
+from pathlib import Path
+from unittest.mock import patch
+
+import pytest
+
+from app.news_aggregator import (
+ _parse_rss_date,
+ _strip_html,
+ fetch_rss,
+ fetch_tagesschau,
+ upsert_articles,
+)
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Helper
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+class TestStripHtml:
+ def test_removes_tags(self):
+ assert _strip_html("
Hello world
") == "Hello world"
+
+ def test_decodes_cdata(self):
+ assert "Test" in _strip_html("")
+
+ def test_decodes_entities(self):
+ assert _strip_html("a & b") == "a & b"
+
+ def test_collapses_whitespace(self):
+ assert _strip_html("
a b\n c
") == "a b c"
+
+ def test_empty(self):
+ assert _strip_html("") == ""
+
+
+class TestParseRssDate:
+ def test_rfc822_to_iso(self):
+ result = _parse_rss_date("Tue, 28 Apr 2026 10:45:12 GMT")
+ assert result.startswith("2026-04-28")
+
+ def test_invalid_returns_empty(self):
+ assert _parse_rss_date("garbage") == ""
+ assert _parse_rss_date("") == ""
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# fetch_tagesschau (mocked HTTP)
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+SAMPLE_TAGESSCHAU_JSON = json.dumps({
+ "news": [
+ {
+ "title": "Bundestag berät über Wohnungsbau",
+ "firstSentence": "Der Bundestag hat heute über das neue Wohnungsbau-Gesetz beraten.",
+ "shareURL": "https://www.tagesschau.de/inland/bundestag-wohnungsbau-100.html",
+ "date": "2026-04-28T10:00:00.000+02:00",
+ "ressort": "inland",
+ "tags": [{"tag": "Wohnungsbau"}, {"tag": "Bundestag"}],
+ },
+ {
+ "title": "EU-Kommission stellt Klimapaket vor",
+ "firstSentence": "Die EU plant ehrgeizige Klimaziele.",
+ "shareURL": "https://www.tagesschau.de/ausland/eu-klima-100.html",
+ "date": "2026-04-28T11:00:00.000+02:00",
+ "ressort": "ausland",
+ "tags": [{"tag": "Klima"}, {"tag": "EU"}],
+ },
+ {
+ # Dieser hat keinen shareURL — sollte uebersprungen werden
+ "title": "Kein Link",
+ "firstSentence": "Skip mich",
+ },
+ ],
+}).encode("utf-8")
+
+
+class TestFetchTagesschau:
+ def test_parses_news_array(self):
+ with patch("app.news_aggregator._http_get", return_value=SAMPLE_TAGESSCHAU_JSON):
+ articles = fetch_tagesschau(ressorts=["inland"])
+ # Deduplication ueber URL → 2 unique
+ assert len(articles) == 2
+ first = articles[0]
+ assert first["url"] == "https://www.tagesschau.de/inland/bundestag-wohnungsbau-100.html"
+ assert first["titel"] == "Bundestag berät über Wohnungsbau"
+ assert "Wohnungsbau" in first["summary"]
+ assert first["source"] == "tagesschau"
+ assert first["ressort"] == "inland"
+ assert "Wohnungsbau" in first["tags"]
+
+ def test_skips_items_without_link(self):
+ with patch("app.news_aggregator._http_get", return_value=SAMPLE_TAGESSCHAU_JSON):
+ articles = fetch_tagesschau(ressorts=["inland"])
+ assert all(a["url"] for a in articles)
+
+ def test_returns_empty_on_http_error(self):
+ with patch("app.news_aggregator._http_get", return_value=None):
+ articles = fetch_tagesschau(ressorts=["inland"])
+ assert articles == []
+
+ def test_dedup_across_ressorts(self):
+ """Wenn dasselbe Item in zwei Ressorts erscheint, wird es nur 1× geliefert."""
+ with patch("app.news_aggregator._http_get", return_value=SAMPLE_TAGESSCHAU_JSON):
+ articles = fetch_tagesschau(ressorts=["inland", "ausland"])
+ urls = [a["url"] for a in articles]
+ assert len(urls) == len(set(urls))
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# fetch_rss (mocked HTTP)
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+SAMPLE_RSS = """
+
BT Aktuell
+-
+
+ https://www.bundestag.de/dokumente/textarchiv/2026/kw18-wohnungsbau-1170388
+
+Tue, 28 Apr 2026 10:45:12 GMT
+
+-
+
Antrag zur Klimapolitik
+ https://www.bundestag.de/klima
+Klimaschutz im Bundestag
+Mon, 27 Apr 2026 10:00:00 GMT
+
+""".encode("utf-8")
+
+
+class TestFetchRss:
+ def test_parses_rss_items(self):
+ with patch("app.news_aggregator._http_get", return_value=SAMPLE_RSS):
+ articles = fetch_rss("bundestag-aktuell", "https://example.com/rss")
+ assert len(articles) == 2
+ first = articles[0]
+ assert "Wohnungsbau" in first["titel"]
+ assert first["url"].startswith("https://www.bundestag.de")
+ assert first["source"] == "bundestag-aktuell"
+ assert first["datum"].startswith("2026-04-28")
+ assert "Bundestag" in first["summary"]
+
+ def test_strips_cdata_and_html(self):
+ with patch("app.news_aggregator._http_get", return_value=SAMPLE_RSS):
+ articles = fetch_rss("bundestag-aktuell", "https://example.com/rss")
+ for a in articles:
+ assert "
+ Nur Titel
+ nur-link
+ """
+ with patch("app.news_aggregator._http_get", return_value=bad):
+ articles = fetch_rss("x", "https://example.com/rss")
+ assert articles == []
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# upsert_articles
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+@pytest.fixture
+def empty_db(tmp_path: Path) -> Path:
+ db = tmp_path / "test_news.db"
+ conn = sqlite3.connect(str(db))
+ conn.execute("""
+ CREATE TABLE news_articles (
+ url TEXT PRIMARY KEY,
+ titel TEXT NOT NULL,
+ summary TEXT,
+ datum TEXT NOT NULL,
+ source TEXT NOT NULL,
+ ressort TEXT,
+ tags TEXT,
+ summary_embedding BLOB,
+ embedding_model TEXT,
+ fetched_at TEXT NOT NULL DEFAULT (datetime('now'))
+ )
+ """)
+ conn.commit()
+ conn.close()
+ return db
+
+
+SAMPLE_ARTICLES = [
+ {
+ "url": "https://example.com/a",
+ "titel": "Wohnungsbau",
+ "summary": "Heute im Bundestag",
+ "datum": "2026-04-28",
+ "source": "tagesschau",
+ "ressort": "inland",
+ "tags": ["Wohnungsbau"],
+ },
+ {
+ "url": "https://example.com/b",
+ "titel": "Klima",
+ "summary": "EU plant Klimaziele",
+ "datum": "2026-04-28",
+ "source": "tagesschau",
+ "ressort": "ausland",
+ "tags": ["Klima", "EU"],
+ },
+]
+
+
+class TestUpsertArticles:
+ def test_inserts_new_articles(self, empty_db):
+ stats = upsert_articles(SAMPLE_ARTICLES, db_path=empty_db, embed=False)
+ assert stats["inserted"] == 2
+ assert stats["updated"] == 0
+
+ def test_updates_existing_articles(self, empty_db):
+ upsert_articles(SAMPLE_ARTICLES, db_path=empty_db, embed=False)
+ # Re-run with same URLs but different titel
+ modified = [{**a, "titel": a["titel"] + " (neu)"} for a in SAMPLE_ARTICLES]
+ stats = upsert_articles(modified, db_path=empty_db, embed=False)
+ assert stats["updated"] == 2
+ assert stats["inserted"] == 0
+ # Verify the title was updated
+ conn = sqlite3.connect(str(empty_db))
+ row = conn.execute(
+ "SELECT titel FROM news_articles WHERE url=?",
+ (SAMPLE_ARTICLES[0]["url"],),
+ ).fetchone()
+ conn.close()
+ assert row[0].endswith("(neu)")
+
+ def test_persists_tags_as_json(self, empty_db):
+ upsert_articles(SAMPLE_ARTICLES, db_path=empty_db, embed=False)
+ conn = sqlite3.connect(str(empty_db))
+ row = conn.execute(
+ "SELECT tags FROM news_articles WHERE url=?",
+ (SAMPLE_ARTICLES[0]["url"],),
+ ).fetchone()
+ conn.close()
+ tags = json.loads(row[0])
+ assert tags == ["Wohnungsbau"]
+
+ def test_missing_db_returns_zeros(self, tmp_path):
+ stats = upsert_articles(SAMPLE_ARTICLES,
+ db_path=tmp_path / "missing.db", embed=False)
+ assert stats == {"inserted": 0, "updated": 0, "embedded": 0}
diff --git a/tests/test_presse_generator.py b/tests/test_presse_generator.py
new file mode 100644
index 0000000..4c20af9
--- /dev/null
+++ b/tests/test_presse_generator.py
@@ -0,0 +1,224 @@
+"""Tests fuer app.presse_generator (#170 Phase 4)."""
+from __future__ import annotations
+
+import json
+import sqlite3
+from pathlib import Path
+from unittest.mock import patch
+
+import pytest
+
+from app.presse_generator import (
+ _build_user_prompt,
+ generate_draft,
+ get_draft,
+ list_drafts,
+)
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Fixture: DB mit Antrag + News
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+@pytest.fixture
+def db_with_antrag_and_news(tmp_path: Path) -> Path:
+ db = tmp_path / "test_presse.db"
+ conn = sqlite3.connect(str(db))
+ conn.execute("""
+ CREATE TABLE assessments (
+ drucksache TEXT PRIMARY KEY,
+ title TEXT,
+ bundesland TEXT,
+ antrag_zusammenfassung TEXT,
+ gwoe_score REAL,
+ gwoe_begruendung TEXT,
+ empfehlung TEXT
+ )
+ """)
+ conn.execute("""
+ CREATE TABLE news_articles (
+ url TEXT PRIMARY KEY,
+ titel TEXT NOT NULL,
+ summary TEXT
+ )
+ """)
+ conn.execute("""
+ CREATE TABLE presse_drafts (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ drucksache TEXT NOT NULL,
+ bundesland TEXT NOT NULL,
+ news_url TEXT NOT NULL,
+ news_titel TEXT NOT NULL,
+ titel TEXT NOT NULL,
+ body TEXT NOT NULL,
+ model TEXT NOT NULL,
+ created_at TEXT NOT NULL DEFAULT (datetime('now'))
+ )
+ """)
+ conn.execute(
+ """INSERT INTO assessments
+ (drucksache, title, bundesland, antrag_zusammenfassung,
+ gwoe_score, gwoe_begruendung, empfehlung)
+ VALUES (?, ?, ?, ?, ?, ?, ?)""",
+ (
+ "18/A", "Wohnungsbau-Reform-Antrag", "NRW",
+ "Antrag fuer mehr sozialen Wohnungsbau",
+ 8.5, "Stark gemeinwohlorientiert",
+ "Uneingeschränkt unterstützen",
+ ),
+ )
+ conn.execute(
+ "INSERT INTO news_articles (url, titel, summary) VALUES (?, ?, ?)",
+ (
+ "https://example.com/wohnen",
+ "Wohnungsmarkt im Umbruch",
+ "Die Mietpreise steigen weiter, der Bundestag berät heute",
+ ),
+ )
+ conn.commit()
+ conn.close()
+ return db
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# _build_user_prompt
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+class TestBuildUserPrompt:
+ def test_includes_drucksache(self):
+ prompt = _build_user_prompt(
+ drucksache="18/A", bundesland="NRW",
+ antrag_titel="Test", antrag_zusammenfassung="Summary",
+ gwoe_score=7.5, gwoe_begruendung="ok",
+ empfehlung="Unterstützen",
+ news_titel="News", news_summary="Lead",
+ news_url="https://example.com",
+ )
+ assert "18/A" in prompt
+ assert "NRW" in prompt
+ assert "7.5" in prompt
+ assert "News" in prompt
+
+ def test_handles_missing_zusammenfassung(self):
+ prompt = _build_user_prompt(
+ drucksache="x", bundesland="x", antrag_titel="x",
+ antrag_zusammenfassung="", gwoe_score=5.0,
+ gwoe_begruendung="", empfehlung="",
+ news_titel="x", news_summary="", news_url="",
+ )
+ assert "(keine vorhanden)" in prompt
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# generate_draft (mocked QwenBewerter)
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+class FakeBewerter:
+ """Mock fuer QwenBewerter, gibt fixe LLM-Response zurueck."""
+
+ def __init__(self, response: dict):
+ self._response = response
+ self.last_request = None
+
+ async def bewerte(self, request):
+ self.last_request = request
+ return self._response
+
+
+@pytest.mark.asyncio
+async def test_generate_draft_persists_record(db_with_antrag_and_news, monkeypatch):
+ bewerter = FakeBewerter({
+ "titel": "Wohnungsbau jetzt",
+ "body": "Der vorliegende Antrag der Drucksache 18/A ..."
+ * 10, # langer Body
+ })
+ # Patch settings.dashscope_model fuer den INSERT
+ from app.config import settings as real_settings
+ monkeypatch.setattr(real_settings, "llm_model_default", "qwen-test")
+ result = await generate_draft(
+ drucksache="18/A",
+ news_url="https://example.com/wohnen",
+ db_path=db_with_antrag_and_news,
+ bewerter=bewerter,
+ )
+
+ assert result["id"] == 1
+ assert result["drucksache"] == "18/A"
+ assert result["bundesland"] == "NRW"
+ assert result["news_titel"] == "Wohnungsmarkt im Umbruch"
+ assert result["titel"] == "Wohnungsbau jetzt"
+ assert "18/A" in result["body"]
+
+
+@pytest.mark.asyncio
+async def test_generate_draft_unknown_drucksache(db_with_antrag_and_news):
+ bewerter = FakeBewerter({"titel": "x", "body": "y"})
+ with pytest.raises(ValueError, match="Drucksache"):
+ await generate_draft(
+ drucksache="99/MISSING",
+ news_url="https://example.com/wohnen",
+ db_path=db_with_antrag_and_news,
+ bewerter=bewerter,
+ )
+
+
+@pytest.mark.asyncio
+async def test_generate_draft_unknown_news(db_with_antrag_and_news):
+ bewerter = FakeBewerter({"titel": "x", "body": "y"})
+ with pytest.raises(ValueError, match="News-URL"):
+ await generate_draft(
+ drucksache="18/A",
+ news_url="https://example.com/missing",
+ db_path=db_with_antrag_and_news,
+ bewerter=bewerter,
+ )
+
+
+@pytest.mark.asyncio
+async def test_generate_draft_empty_response_raises(db_with_antrag_and_news, monkeypatch):
+ bewerter = FakeBewerter({"titel": "", "body": ""})
+ from app.config import settings as real_settings
+ monkeypatch.setattr(real_settings, "llm_model_default", "qwen-test")
+ with pytest.raises(ValueError, match="unvollständig"):
+ await generate_draft(
+ drucksache="18/A",
+ news_url="https://example.com/wohnen",
+ db_path=db_with_antrag_and_news,
+ bewerter=bewerter,
+ )
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# list_drafts + get_draft
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+class TestListAndGetDrafts:
+ def test_empty(self, db_with_antrag_and_news):
+ assert list_drafts(db_path=db_with_antrag_and_news) == []
+ assert get_draft(99, db_path=db_with_antrag_and_news) is None
+
+ def test_after_insert(self, db_with_antrag_and_news):
+ # Direct DB-Insert (test setup)
+ conn = sqlite3.connect(str(db_with_antrag_and_news))
+ conn.execute(
+ """INSERT INTO presse_drafts
+ (drucksache, bundesland, news_url, news_titel, titel, body, model)
+ VALUES (?, ?, ?, ?, ?, ?, ?)""",
+ ("18/A", "NRW", "https://x.de/n", "News-Titel",
+ "PM-Titel", "PM-Body", "test-model"),
+ )
+ conn.commit()
+ conn.close()
+
+ drafts = list_drafts(db_path=db_with_antrag_and_news)
+ assert len(drafts) == 1
+ assert drafts[0]["drucksache"] == "18/A"
+ assert drafts[0]["titel"] == "PM-Titel"
+
+ d = get_draft(drafts[0]["id"], db_path=db_with_antrag_and_news)
+ assert d is not None
+ assert d["body"] == "PM-Body"
diff --git a/tests/test_themen_matching.py b/tests/test_themen_matching.py
new file mode 100644
index 0000000..6a64c41
--- /dev/null
+++ b/tests/test_themen_matching.py
@@ -0,0 +1,297 @@
+"""Tests fuer app.themen_matching (#170 Phase 2)."""
+from __future__ import annotations
+
+import json
+import sqlite3
+from datetime import datetime, timezone, timedelta
+from pathlib import Path
+from unittest.mock import patch
+
+import pytest
+
+from app.themen_matching import (
+ aggregate_themen_zeitreihe,
+ aggregate_top_themen,
+ find_anträge_for_news,
+ find_news_for_antrag,
+)
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Fixture: DB mit News + Assessments + Embeddings
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+def _vec(dim: int = 8, val: float = 0.1) -> bytes:
+ """Konstruiert einen einfachen Vektor als JSON-Bytes."""
+ return json.dumps([val] * dim).encode()
+
+
+def _vec_from(values: list[float]) -> bytes:
+ return json.dumps(values).encode()
+
+
+@pytest.fixture
+def populated_db(tmp_path: Path) -> Path:
+ db = tmp_path / "test_match.db"
+ conn = sqlite3.connect(str(db))
+ conn.execute("""
+ CREATE TABLE news_articles (
+ url TEXT PRIMARY KEY,
+ titel TEXT NOT NULL,
+ summary TEXT,
+ datum TEXT NOT NULL,
+ source TEXT NOT NULL,
+ ressort TEXT,
+ tags TEXT,
+ summary_embedding BLOB,
+ embedding_model TEXT,
+ fetched_at TEXT NOT NULL DEFAULT (datetime('now'))
+ )
+ """)
+ conn.execute("""
+ CREATE TABLE assessments (
+ drucksache TEXT PRIMARY KEY,
+ title TEXT,
+ fraktionen TEXT,
+ datum TEXT,
+ link TEXT,
+ bundesland TEXT,
+ gwoe_score REAL,
+ gwoe_begruendung TEXT,
+ gwoe_matrix TEXT,
+ gwoe_schwerpunkt TEXT,
+ wahlprogramm_scores TEXT,
+ verbesserungen TEXT,
+ staerken TEXT,
+ schwaechen TEXT,
+ empfehlung TEXT,
+ empfehlung_symbol TEXT,
+ verbesserungspotenzial TEXT,
+ themen TEXT,
+ antrag_zusammenfassung TEXT,
+ antrag_kernpunkte TEXT,
+ source TEXT,
+ model TEXT,
+ created_at TEXT,
+ updated_at TEXT,
+ summary_embedding BLOB,
+ embedding_model TEXT
+ )
+ """)
+
+ today = datetime.now(timezone.utc).isoformat()
+ yesterday = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
+ old = (datetime.now(timezone.utc) - timedelta(days=200)).isoformat()
+
+ # News-Artikel mit unterschiedlichen Embeddings
+ news = [
+ # Wohnungsbau-News (vec orientiert auf [1,0,0,...])
+ ("https://example.com/n1", "Wohnungsbau-Reform",
+ "Bundestag berät Wohnungsbau", today, "tagesschau", "inland",
+ '["Wohnungsbau"]',
+ _vec_from([1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])),
+ # Klima-News (vec orientiert auf [0,1,0,...])
+ ("https://example.com/n2", "Klimaschutzgesetz",
+ "EU plant Klimaziele", today, "tagesschau", "ausland",
+ '["Klima"]',
+ _vec_from([0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])),
+ # Old news, sollte aus Zeitfenster filtern
+ ("https://example.com/n3", "Alte News", "", old, "tagesschau", "inland",
+ '[]', _vec_from([0.5, 0.5, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])),
+ ]
+ for url, titel, summary, datum, source, ressort, tags, vec in news:
+ conn.execute(
+ """INSERT INTO news_articles
+ (url, titel, summary, datum, source, ressort, tags,
+ summary_embedding, embedding_model)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'qwen-embedding-v4')""",
+ (url, titel, summary, datum, source, ressort, tags, vec),
+ )
+
+ # Assessments mit Embeddings:
+ # - 18/A passt zu Wohnungsbau-News (vec [1,0,...])
+ # - 18/B passt zu Klima-News
+ # - 18/C ist orthogonal — sollte nirgends matchen
+ now_iso = datetime.now().isoformat()
+ assessments = [
+ ("18/A", "Wohnungsbau-Antrag", '["GRÜNE"]', "2026-04-15", "NRW",
+ 8.0, "Uneingeschränkt unterstützen",
+ _vec_from([0.95, 0.1, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])),
+ ("18/B", "Klima-Antrag", '["SPD"]', "2026-04-16", "NRW",
+ 7.0, "Unterstützen mit Änderungen",
+ _vec_from([0.0, 0.95, 0.1, 0.0, 0.0, 0.0, 0.0, 0.0])),
+ ("18/C", "Sonstiges", '["CDU"]', "2026-04-17", "NRW",
+ 5.0, "Überarbeiten",
+ _vec_from([0.0, 0.0, 0.0, 0.0, 0.95, 0.0, 0.0, 0.0])),
+ ]
+ for ds, title, fr, dat, bl, sc, emp, vec in assessments:
+ conn.execute(
+ """INSERT INTO assessments
+ (drucksache, title, fraktionen, datum, bundesland, gwoe_score,
+ empfehlung, themen, source, model, created_at, updated_at,
+ summary_embedding, embedding_model)
+ VALUES (?, ?, ?, ?, ?, ?, ?, '[]', 'test', 'test', ?, ?,
+ ?, 'qwen-embedding-v4')""",
+ (ds, title, fr, dat, bl, sc, emp, now_iso, now_iso, vec),
+ )
+
+ conn.commit()
+ conn.close()
+ return db
+
+
+@pytest.fixture(autouse=True)
+def mock_embedding_model():
+ """Stellt sicher, dass EMBEDDING_MODEL_READ=qwen-embedding-v4 fuer Tests."""
+ with patch("app.embeddings.EMBEDDING_MODEL_READ", "qwen-embedding-v4"):
+ yield
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# find_anträge_for_news
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+class TestFindAnträgeForNews:
+ def test_wohnungsbau_news_matches_wohnungsbau_antrag(self, populated_db):
+ result = find_anträge_for_news(
+ "https://example.com/n1", db_path=populated_db,
+ min_similarity=0.5,
+ )
+ assert len(result) >= 1
+ # Top-Match sollte 18/A sein
+ assert result[0]["drucksache"] == "18/A"
+ assert result[0]["similarity"] > 0.9
+
+ def test_klima_news_matches_klima_antrag(self, populated_db):
+ result = find_anträge_for_news(
+ "https://example.com/n2", db_path=populated_db,
+ min_similarity=0.5,
+ )
+ assert len(result) >= 1
+ assert result[0]["drucksache"] == "18/B"
+
+ def test_min_similarity_filters_orthogonal(self, populated_db):
+ """Mit hohem min_similarity-Cutoff darf kein orthogonaler Antrag drin sein."""
+ result = find_anträge_for_news(
+ "https://example.com/n1", db_path=populated_db,
+ min_similarity=0.9,
+ )
+ druck = [r["drucksache"] for r in result]
+ assert "18/C" not in druck # 18/C ist orthogonal zu allem
+
+ def test_unknown_news_returns_empty(self, populated_db):
+ assert find_anträge_for_news(
+ "https://example.com/missing", db_path=populated_db,
+ ) == []
+
+ def test_empty_db(self, tmp_path):
+ assert find_anträge_for_news(
+ "x", db_path=tmp_path / "missing.db",
+ ) == []
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# find_news_for_antrag
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+class TestFindNewsForAntrag:
+ def test_wohnungsbau_antrag_matches_wohnungsbau_news(self, populated_db):
+ result = find_news_for_antrag(
+ "18/A", db_path=populated_db, min_similarity=0.5,
+ )
+ assert len(result) >= 1
+ assert result[0]["url"] == "https://example.com/n1"
+
+ def test_old_news_filtered_out(self, populated_db):
+ """News aus dem 200-Tage-alten Bucket darf nicht im 90-Tage-Fenster auftauchen."""
+ result = find_news_for_antrag(
+ "18/A", db_path=populated_db, min_similarity=0.0,
+ days_window=90,
+ )
+ urls = [r["url"] for r in result]
+ assert "https://example.com/n3" not in urls
+
+ def test_top_k_limits(self, populated_db):
+ """top_k=1 liefert nur den besten Match."""
+ result = find_news_for_antrag(
+ "18/A", db_path=populated_db, min_similarity=0.0,
+ top_k=1,
+ )
+ assert len(result) <= 1
+
+ def test_unknown_antrag(self, populated_db):
+ assert find_news_for_antrag(
+ "99/Missing", db_path=populated_db,
+ ) == []
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# aggregate_top_themen
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+class TestAggregateTopThemen:
+ def test_returns_buckets(self, populated_db):
+ result = aggregate_top_themen(
+ db_path=populated_db, min_similarity=0.5,
+ )
+ # Heute gibt es 2 News-Artikel, beide mit Match
+ assert len(result["buckets"]) == 2
+ assert "n_total_news" in result
+
+ def test_each_bucket_has_news_and_matches(self, populated_db):
+ result = aggregate_top_themen(
+ db_path=populated_db, min_similarity=0.5,
+ )
+ for b in result["buckets"]:
+ assert "news" in b
+ assert "matches" in b
+ assert "url" in b["news"]
+ assert "titel" in b["news"]
+
+ def test_days_window_filter(self, populated_db):
+ """Mit kleinem Fenster nur die fresh News, alte raus."""
+ result = aggregate_top_themen(
+ db_path=populated_db, days_window=7, min_similarity=0.5,
+ )
+ for b in result["buckets"]:
+ assert b["news"]["url"] != "https://example.com/n3"
+
+ def test_min_similarity_filter(self, populated_db):
+ """Mit hohem min_sim verschwinden Cross-Matches."""
+ result = aggregate_top_themen(
+ db_path=populated_db, min_similarity=0.99,
+ )
+ # Nur exakte Matches sollten überleben
+ for b in result["buckets"]:
+ for m in b["matches"]:
+ assert m["similarity"] > 0.99
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# aggregate_themen_zeitreihe
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+class TestAggregateZeitreihe:
+ def test_structure(self, populated_db):
+ result = aggregate_themen_zeitreihe(db_path=populated_db, days_window=7)
+ assert "buckets" in result
+ assert "sources" in result
+ assert "series" in result
+
+ def test_only_recent(self, populated_db):
+ """Mit days_window=7 darf das alte News nicht im Bucket auftauchen."""
+ result = aggregate_themen_zeitreihe(db_path=populated_db, days_window=7)
+ # Nur heutige News (n1, n2) — n3 ist 200 Tage alt
+ total = sum(sum(s) for s in result["series"].values())
+ assert total == 2
+
+ def test_series_aligned(self, populated_db):
+ """Pro Source: series-Liste muss exakt so lang sein wie buckets."""
+ result = aggregate_themen_zeitreihe(db_path=populated_db, days_window=7)
+ for source in result["sources"]:
+ assert len(result["series"][source]) == len(result["buckets"])