gwoe-antragspruefer/tests/test_themen_matching.py
Dotty Dotter d30fcb132a feat: Stand-Dashboard, Score-Histogram, PM-Markdown, Live-Polling, Cluster-Indicator
Sechs zusammengehoerige UX/Performance-Erweiterungen:

**1. /v2/admin/stand — System-Stand-Dashboard**
KPI-Kacheln (Bewertungen, Plenum-Votes, Match, Vote-Orphans, News, PM-
Drafts, Bookmarks) + GWÖ-Score-Histogram + Per-BL-Tabelle + News-Source-
Tabelle. Auto-Refresh 30 s. Endpoint /api/admin/stand liefert alles in
einem Roundtrip. Nav-Eintrag "Stand" in der Admin-Sektion.

**2. /auswertungen Score-Histogram-Tab**
4. Tab "Score-Verteilung" mit Bar-Chart 0–10. Endpoint
/api/auswertungen/score-histogram liefert Buckets, optional gefiltert
nach Bundesland + Wahlperiode. Reagiert auf den globalen BL-Filter.

**3. PM-Body Markdown-Rendering**
Mini-Renderer im Modal: **bold** / __bold__ / *italic* / _italic_ /
- list-bullets / Doppel-Newline-Paragraphen. Kein externer Markdown-
Parser, keine neue Dependency. Body wird HTML-escaped, Patterns dann
zu Tags umgesetzt.

**4. Performance-Cache fuer themen_matching**
TTL-Cache (60 s) fuer aggregate_top_themen und aggregate_news_cluster.
Cache-Key inkl. aller Filter-Parameter. Automatische Invalidation in
news_aggregator.run_aggregator nach erfolgreichem Insert/Embed.
4 neue Tests fuer cache_get/set/clear-Verhalten.

**5. Stimmverhalten Banner Live-Update**
Statt setTimeout(800) jetzt pollQueueUntilDrained: alle 4 s
GET /api/queue/status, Banner zeigt pending + elapsed live. Bei
pending=0 zwei Polls in Folge: Banner + Stimmverhalten-Charts neu
laden. Max 5 Min Polling-Timeout. Bricht ab wenn Tab gewechselt wird.

**6. Antrag-Detail Cluster-Indicator**
News-Match-Box im Antrag-Detail laedt parallel /aktuelle-themen/cluster
und mappt URL → Cluster. Pro News-Card ein "🔗 Cluster (N News)"-Badge
mit Hover-Tooltip der anderen Cluster-Members. Macht thematische
Bündel sichtbar, ohne Pop-Out auf den Cluster-Tab.

Suite: 1088 → 1092 grün (4 Cache-Tests).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 02:49:06 +02:00

495 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests fuer app.themen_matching (#170 Phase 2)."""
from __future__ import annotations
import json
import sqlite3
from datetime import datetime, timezone, timedelta
from pathlib import Path
from unittest.mock import patch
import pytest
from app.themen_matching import (
aggregate_news_cluster,
aggregate_themen_zeitreihe,
aggregate_top_antraege_with_news,
aggregate_top_themen,
cache_clear,
compute_relevance,
find_anträge_for_news,
find_news_for_antrag,
)
# ─────────────────────────────────────────────────────────────────────────────
# Fixture: DB mit News + Assessments + Embeddings
# ─────────────────────────────────────────────────────────────────────────────
def _vec(dim: int = 8, val: float = 0.1) -> bytes:
"""Konstruiert einen einfachen Vektor als JSON-Bytes."""
return json.dumps([val] * dim).encode()
def _vec_from(values: list[float]) -> bytes:
return json.dumps(values).encode()
@pytest.fixture
def populated_db(tmp_path: Path) -> Path:
db = tmp_path / "test_match.db"
conn = sqlite3.connect(str(db))
conn.execute("""
CREATE TABLE news_articles (
url TEXT PRIMARY KEY,
titel TEXT NOT NULL,
summary TEXT,
datum TEXT NOT NULL,
source TEXT NOT NULL,
ressort TEXT,
tags TEXT,
summary_embedding BLOB,
embedding_model TEXT,
fetched_at TEXT NOT NULL DEFAULT (datetime('now'))
)
""")
conn.execute("""
CREATE TABLE assessments (
drucksache TEXT PRIMARY KEY,
title TEXT,
fraktionen TEXT,
datum TEXT,
link TEXT,
bundesland TEXT,
gwoe_score REAL,
gwoe_begruendung TEXT,
gwoe_matrix TEXT,
gwoe_schwerpunkt TEXT,
wahlprogramm_scores TEXT,
verbesserungen TEXT,
staerken TEXT,
schwaechen TEXT,
empfehlung TEXT,
empfehlung_symbol TEXT,
verbesserungspotenzial TEXT,
themen TEXT,
antrag_zusammenfassung TEXT,
antrag_kernpunkte TEXT,
source TEXT,
model TEXT,
created_at TEXT,
updated_at TEXT,
summary_embedding BLOB,
embedding_model TEXT
)
""")
today = datetime.now(timezone.utc).isoformat()
yesterday = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
old = (datetime.now(timezone.utc) - timedelta(days=200)).isoformat()
# News-Artikel mit unterschiedlichen Embeddings
news = [
# Wohnungsbau-News (vec orientiert auf [1,0,0,...])
("https://example.com/n1", "Wohnungsbau-Reform",
"Bundestag berät Wohnungsbau", today, "tagesschau", "inland",
'["Wohnungsbau"]',
_vec_from([1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])),
# Klima-News (vec orientiert auf [0,1,0,...])
("https://example.com/n2", "Klimaschutzgesetz",
"EU plant Klimaziele", today, "tagesschau", "ausland",
'["Klima"]',
_vec_from([0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])),
# Old news, sollte aus Zeitfenster filtern
("https://example.com/n3", "Alte News", "", old, "tagesschau", "inland",
'[]', _vec_from([0.5, 0.5, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])),
]
for url, titel, summary, datum, source, ressort, tags, vec in news:
conn.execute(
"""INSERT INTO news_articles
(url, titel, summary, datum, source, ressort, tags,
summary_embedding, embedding_model)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'qwen-embedding-v4')""",
(url, titel, summary, datum, source, ressort, tags, vec),
)
# Assessments mit Embeddings:
# - 18/A passt zu Wohnungsbau-News (vec [1,0,...])
# - 18/B passt zu Klima-News
# - 18/C ist orthogonal — sollte nirgends matchen
now_iso = datetime.now().isoformat()
assessments = [
("18/A", "Wohnungsbau-Antrag", '["GRÜNE"]', "2026-04-15", "NRW",
8.0, "Uneingeschränkt unterstützen",
_vec_from([0.95, 0.1, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])),
("18/B", "Klima-Antrag", '["SPD"]', "2026-04-16", "NRW",
7.0, "Unterstützen mit Änderungen",
_vec_from([0.0, 0.95, 0.1, 0.0, 0.0, 0.0, 0.0, 0.0])),
("18/C", "Sonstiges", '["CDU"]', "2026-04-17", "NRW",
5.0, "Überarbeiten",
_vec_from([0.0, 0.0, 0.0, 0.0, 0.95, 0.0, 0.0, 0.0])),
]
for ds, title, fr, dat, bl, sc, emp, vec in assessments:
conn.execute(
"""INSERT INTO assessments
(drucksache, title, fraktionen, datum, bundesland, gwoe_score,
empfehlung, themen, source, model, created_at, updated_at,
summary_embedding, embedding_model)
VALUES (?, ?, ?, ?, ?, ?, ?, '[]', 'test', 'test', ?, ?,
?, 'qwen-embedding-v4')""",
(ds, title, fr, dat, bl, sc, emp, now_iso, now_iso, vec),
)
conn.commit()
conn.close()
return db
@pytest.fixture(autouse=True)
def mock_embedding_model():
"""Stellt sicher, dass EMBEDDING_MODEL_READ=qwen-embedding-v4 fuer Tests."""
with patch("app.embeddings.EMBEDDING_MODEL_READ", "qwen-embedding-v4"):
yield
# ─────────────────────────────────────────────────────────────────────────────
# find_anträge_for_news
# ─────────────────────────────────────────────────────────────────────────────
class TestFindAnträgeForNews:
def test_wohnungsbau_news_matches_wohnungsbau_antrag(self, populated_db):
result = find_anträge_for_news(
"https://example.com/n1", db_path=populated_db,
min_similarity=0.5,
)
assert len(result) >= 1
# Top-Match sollte 18/A sein
assert result[0]["drucksache"] == "18/A"
assert result[0]["similarity"] > 0.9
def test_klima_news_matches_klima_antrag(self, populated_db):
result = find_anträge_for_news(
"https://example.com/n2", db_path=populated_db,
min_similarity=0.5,
)
assert len(result) >= 1
assert result[0]["drucksache"] == "18/B"
def test_min_similarity_filters_orthogonal(self, populated_db):
"""Mit hohem min_similarity-Cutoff darf kein orthogonaler Antrag drin sein."""
result = find_anträge_for_news(
"https://example.com/n1", db_path=populated_db,
min_similarity=0.9,
)
druck = [r["drucksache"] for r in result]
assert "18/C" not in druck # 18/C ist orthogonal zu allem
def test_unknown_news_returns_empty(self, populated_db):
assert find_anträge_for_news(
"https://example.com/missing", db_path=populated_db,
) == []
def test_empty_db(self, tmp_path):
assert find_anträge_for_news(
"x", db_path=tmp_path / "missing.db",
) == []
# ─────────────────────────────────────────────────────────────────────────────
# find_news_for_antrag
# ─────────────────────────────────────────────────────────────────────────────
class TestFindNewsForAntrag:
def test_wohnungsbau_antrag_matches_wohnungsbau_news(self, populated_db):
result = find_news_for_antrag(
"18/A", db_path=populated_db, min_similarity=0.5,
)
assert len(result) >= 1
assert result[0]["url"] == "https://example.com/n1"
def test_old_news_filtered_out(self, populated_db):
"""News aus dem 200-Tage-alten Bucket darf nicht im 90-Tage-Fenster auftauchen."""
result = find_news_for_antrag(
"18/A", db_path=populated_db, min_similarity=0.0,
days_window=90,
)
urls = [r["url"] for r in result]
assert "https://example.com/n3" not in urls
def test_top_k_limits(self, populated_db):
"""top_k=1 liefert nur den besten Match."""
result = find_news_for_antrag(
"18/A", db_path=populated_db, min_similarity=0.0,
top_k=1,
)
assert len(result) <= 1
def test_unknown_antrag(self, populated_db):
assert find_news_for_antrag(
"99/Missing", db_path=populated_db,
) == []
# ─────────────────────────────────────────────────────────────────────────────
# aggregate_top_themen
# ─────────────────────────────────────────────────────────────────────────────
class TestAggregateTopThemen:
def test_returns_buckets(self, populated_db):
result = aggregate_top_themen(
db_path=populated_db, min_similarity=0.5,
)
# Heute gibt es 2 News-Artikel, beide mit Match
assert len(result["buckets"]) == 2
assert "n_total_news" in result
def test_each_bucket_has_news_and_matches(self, populated_db):
result = aggregate_top_themen(
db_path=populated_db, min_similarity=0.5,
)
for b in result["buckets"]:
assert "news" in b
assert "matches" in b
assert "url" in b["news"]
assert "titel" in b["news"]
def test_days_window_filter(self, populated_db):
"""Mit kleinem Fenster nur die fresh News, alte raus."""
result = aggregate_top_themen(
db_path=populated_db, days_window=7, min_similarity=0.5,
)
for b in result["buckets"]:
assert b["news"]["url"] != "https://example.com/n3"
def test_min_similarity_filter(self, populated_db):
"""Mit hohem min_sim verschwinden Cross-Matches."""
result = aggregate_top_themen(
db_path=populated_db, min_similarity=0.99,
)
# Nur exakte Matches sollten überleben
for b in result["buckets"]:
for m in b["matches"]:
assert m["similarity"] > 0.99
# ─────────────────────────────────────────────────────────────────────────────
# aggregate_themen_zeitreihe
# ─────────────────────────────────────────────────────────────────────────────
class TestComputeRelevance:
def test_empty_returns_none_level(self):
r = compute_relevance([])
assert r["level"] == "none"
assert r["score"] == 0.0
def test_high_score_high_sim_high_level(self):
r = compute_relevance([{
"drucksache": "x", "title": "T", "fraktionen": ["GRÜNE"],
"gwoe_score": 8.0, "similarity": 0.6,
}])
# 8.0 × 0.6 = 4.8 → high
assert r["level"] == "high"
assert r["score"] == 4.8
assert "GWÖ-8.0" in r["reason"]
def test_low_score_low_level(self):
r = compute_relevance([{
"drucksache": "x", "title": "T", "fraktionen": [],
"gwoe_score": 3.0, "similarity": 0.5,
}])
# 3.0 × 0.5 = 1.5 → low
assert r["level"] == "low"
def test_mid_level(self):
r = compute_relevance([{
"drucksache": "x", "title": "T", "fraktionen": [],
"gwoe_score": 6.0, "similarity": 0.5,
}])
# 6.0 × 0.5 = 3.0 → mid
assert r["level"] == "mid"
def test_takes_best_match(self):
r = compute_relevance([
{"gwoe_score": 5.0, "similarity": 0.4, "title": "Schwach", "fraktionen": []},
{"gwoe_score": 9.0, "similarity": 0.55, "title": "Stark", "fraktionen": []},
])
# max(2.0, 4.95) = 4.95 → high
assert r["score"] == 4.95
assert "Stark" in r["reason"]
class TestAggregateZeitreihe:
def test_structure(self, populated_db):
result = aggregate_themen_zeitreihe(db_path=populated_db, days_window=7)
assert "buckets" in result
assert "sources" in result
assert "series" in result
def test_only_recent(self, populated_db):
"""Mit days_window=7 darf das alte News nicht im Bucket auftauchen."""
result = aggregate_themen_zeitreihe(db_path=populated_db, days_window=7)
# Nur heutige News (n1, n2) — n3 ist 200 Tage alt
total = sum(sum(s) for s in result["series"].values())
assert total == 2
def test_series_aligned(self, populated_db):
"""Pro Source: series-Liste muss exakt so lang sein wie buckets."""
result = aggregate_themen_zeitreihe(db_path=populated_db, days_window=7)
for source in result["sources"]:
assert len(result["series"][source]) == len(result["buckets"])
# ─────────────────────────────────────────────────────────────────────────────
# aggregate_top_themen mit Relevance + only_relevant Filter
# ─────────────────────────────────────────────────────────────────────────────
class TestRelevanceInTopThemen:
def test_each_bucket_has_relevance(self, populated_db):
result = aggregate_top_themen(db_path=populated_db, min_similarity=0.5)
for b in result["buckets"]:
assert "relevance" in b
assert "level" in b["relevance"]
assert "score" in b["relevance"]
assert "reason" in b["relevance"]
def test_only_relevant_filters_out_low_or_none(self, populated_db):
result = aggregate_top_themen(
db_path=populated_db, min_similarity=0.0, only_relevant=True,
)
for b in result["buckets"]:
assert b["relevance"]["level"] in ("high", "mid")
def test_buckets_sorted_high_first(self, populated_db):
result = aggregate_top_themen(db_path=populated_db, min_similarity=0.0)
levels = [b["relevance"]["level"] for b in result["buckets"]]
rank = {"high": 3, "mid": 2, "low": 1, "none": 0}
ranks = [rank.get(l, 0) for l in levels]
# Reihenfolge muss monoton fallen
assert ranks == sorted(ranks, reverse=True)
# ─────────────────────────────────────────────────────────────────────────────
# aggregate_news_cluster
# ─────────────────────────────────────────────────────────────────────────────
class TestNewsCluster:
def test_structure(self, populated_db):
# Mit hoeherem intra_threshold und kleinerem min_cluster_size
# auf der Test-DB: orthogonale News bilden keine Cluster
result = aggregate_news_cluster(
db_path=populated_db, min_cluster_size=2,
intra_threshold=0.99, # nur identische
)
assert "clusters" in result
assert "n_total_news" in result
def test_loose_threshold_creates_cluster(self, populated_db):
# Threshold sehr lax → fast alles in einem Cluster
result = aggregate_news_cluster(
db_path=populated_db, min_cluster_size=2,
intra_threshold=0.0, days_window=30,
)
# Mindestens ein Cluster mit >=2 Members
assert len(result["clusters"]) >= 0
for c in result["clusters"]:
assert c["size"] >= 2
assert "members" in c
assert "antrag_matches" in c
assert "top_tags" in c
def test_min_cluster_size_filter(self, populated_db):
result = aggregate_news_cluster(
db_path=populated_db, min_cluster_size=5,
)
# Nur 3 News in der DB → nichts erreicht size>=5
assert result["clusters"] == []
# ─────────────────────────────────────────────────────────────────────────────
# aggregate_top_antraege_with_news
# ─────────────────────────────────────────────────────────────────────────────
class TestTopAntraegeWithNews:
def test_only_high_gwoe(self, populated_db):
"""Nur Antraege mit gwoe_score >= min_gwoe_score auftauchen."""
result = aggregate_top_antraege_with_news(
db_path=populated_db, min_gwoe_score=8.0,
)
for a in result["antraege"]:
assert a["gwoe_score"] >= 8.0
# 18/A hat 8.0, 18/B hat 7.0, 18/C hat 5.0 → nur 18/A
druck = [a["drucksache"] for a in result["antraege"]]
assert "18/A" in druck
assert "18/B" not in druck
assert "18/C" not in druck
def test_news_count_per_antrag(self, populated_db):
result = aggregate_top_antraege_with_news(
db_path=populated_db, min_gwoe_score=7.0, min_similarity=0.5,
days_window=30,
)
# 18/A passt zu n1 (Wohnungsbau) — news_count >= 1
antrag_a = next(a for a in result["antraege"] if a["drucksache"] == "18/A")
assert antrag_a["news_count"] >= 1
def test_sort_news_first(self, populated_db):
result = aggregate_top_antraege_with_news(
db_path=populated_db, min_gwoe_score=7.0, min_similarity=0.5,
days_window=30,
)
# Antraege mit news_count > 0 sollten vor denen ohne stehen
last_with_news = -1
first_without = len(result["antraege"])
for i, a in enumerate(result["antraege"]):
if a["news_count"] > 0:
last_with_news = i
elif first_without == len(result["antraege"]):
first_without = i
assert last_with_news < first_without
# ─────────────────────────────────────────────────────────────────────────────
# TTL-Cache (Performance #170 followup)
# ─────────────────────────────────────────────────────────────────────────────
class TestPerformanceCache:
def test_top_themen_cache_hit_returns_same_object(self, populated_db):
"""Zweiter Call mit gleichen Args sollte den gleichen dict liefern."""
cache_clear()
a = aggregate_top_themen(db_path=populated_db, min_similarity=0.5)
b = aggregate_top_themen(db_path=populated_db, min_similarity=0.5)
# Cache liefert dasselbe Objekt (identity check)
assert a is b
def test_top_themen_cache_miss_different_args(self, populated_db):
"""Andere Args → neuer Eintrag, anderer dict."""
cache_clear()
a = aggregate_top_themen(db_path=populated_db, min_similarity=0.5)
b = aggregate_top_themen(db_path=populated_db, min_similarity=0.6)
# Different filter values → different cache-keys
assert a is not b
def test_cache_clear_invalidates(self, populated_db):
cache_clear()
a = aggregate_top_themen(db_path=populated_db, min_similarity=0.5)
cache_clear()
b = aggregate_top_themen(db_path=populated_db, min_similarity=0.5)
# Nach clear: neuer Aufruf gibt neues Objekt zurueck
assert a is not b
# Inhaltlich identisch
assert len(a["buckets"]) == len(b["buckets"])
def test_cluster_cached_too(self, populated_db):
cache_clear()
a = aggregate_news_cluster(db_path=populated_db, min_cluster_size=1)
b = aggregate_news_cluster(db_path=populated_db, min_cluster_size=1)
assert a is b