"""Tests fuer app.themen_matching (#170 Phase 2).""" from __future__ import annotations import json import sqlite3 from datetime import datetime, timezone, timedelta from pathlib import Path from unittest.mock import patch import pytest from app.themen_matching import ( aggregate_news_cluster, aggregate_themen_zeitreihe, aggregate_top_antraege_with_news, aggregate_top_themen, cache_clear, compute_relevance, find_anträge_for_news, find_news_for_antrag, ) # ───────────────────────────────────────────────────────────────────────────── # Fixture: DB mit News + Assessments + Embeddings # ───────────────────────────────────────────────────────────────────────────── def _vec(dim: int = 8, val: float = 0.1) -> bytes: """Konstruiert einen einfachen Vektor als JSON-Bytes.""" return json.dumps([val] * dim).encode() def _vec_from(values: list[float]) -> bytes: return json.dumps(values).encode() @pytest.fixture def populated_db(tmp_path: Path) -> Path: db = tmp_path / "test_match.db" conn = sqlite3.connect(str(db)) conn.execute(""" CREATE TABLE news_articles ( url TEXT PRIMARY KEY, titel TEXT NOT NULL, summary TEXT, datum TEXT NOT NULL, source TEXT NOT NULL, ressort TEXT, tags TEXT, summary_embedding BLOB, embedding_model TEXT, fetched_at TEXT NOT NULL DEFAULT (datetime('now')) ) """) conn.execute(""" CREATE TABLE assessments ( drucksache TEXT PRIMARY KEY, title TEXT, fraktionen TEXT, datum TEXT, link TEXT, bundesland TEXT, gwoe_score REAL, gwoe_begruendung TEXT, gwoe_matrix TEXT, gwoe_schwerpunkt TEXT, wahlprogramm_scores TEXT, verbesserungen TEXT, staerken TEXT, schwaechen TEXT, empfehlung TEXT, empfehlung_symbol TEXT, verbesserungspotenzial TEXT, themen TEXT, antrag_zusammenfassung TEXT, antrag_kernpunkte TEXT, source TEXT, model TEXT, created_at TEXT, updated_at TEXT, summary_embedding BLOB, embedding_model TEXT ) """) today = datetime.now(timezone.utc).isoformat() yesterday = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat() old = (datetime.now(timezone.utc) - timedelta(days=200)).isoformat() # News-Artikel mit unterschiedlichen Embeddings news = [ # Wohnungsbau-News (vec orientiert auf [1,0,0,...]) ("https://example.com/n1", "Wohnungsbau-Reform", "Bundestag berät Wohnungsbau", today, "tagesschau", "inland", '["Wohnungsbau"]', _vec_from([1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])), # Klima-News (vec orientiert auf [0,1,0,...]) ("https://example.com/n2", "Klimaschutzgesetz", "EU plant Klimaziele", today, "tagesschau", "ausland", '["Klima"]', _vec_from([0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])), # Old news, sollte aus Zeitfenster filtern ("https://example.com/n3", "Alte News", "", old, "tagesschau", "inland", '[]', _vec_from([0.5, 0.5, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])), ] for url, titel, summary, datum, source, ressort, tags, vec in news: conn.execute( """INSERT INTO news_articles (url, titel, summary, datum, source, ressort, tags, summary_embedding, embedding_model) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'qwen-embedding-v4')""", (url, titel, summary, datum, source, ressort, tags, vec), ) # Assessments mit Embeddings: # - 18/A passt zu Wohnungsbau-News (vec [1,0,...]) # - 18/B passt zu Klima-News # - 18/C ist orthogonal — sollte nirgends matchen now_iso = datetime.now().isoformat() assessments = [ ("18/A", "Wohnungsbau-Antrag", '["GRÜNE"]', "2026-04-15", "NRW", 8.0, "Uneingeschränkt unterstützen", _vec_from([0.95, 0.1, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])), ("18/B", "Klima-Antrag", '["SPD"]', "2026-04-16", "NRW", 7.0, "Unterstützen mit Änderungen", _vec_from([0.0, 0.95, 0.1, 0.0, 0.0, 0.0, 0.0, 0.0])), ("18/C", "Sonstiges", '["CDU"]', "2026-04-17", "NRW", 5.0, "Überarbeiten", _vec_from([0.0, 0.0, 0.0, 0.0, 0.95, 0.0, 0.0, 0.0])), ] for ds, title, fr, dat, bl, sc, emp, vec in assessments: conn.execute( """INSERT INTO assessments (drucksache, title, fraktionen, datum, bundesland, gwoe_score, empfehlung, themen, source, model, created_at, updated_at, summary_embedding, embedding_model) VALUES (?, ?, ?, ?, ?, ?, ?, '[]', 'test', 'test', ?, ?, ?, 'qwen-embedding-v4')""", (ds, title, fr, dat, bl, sc, emp, now_iso, now_iso, vec), ) conn.commit() conn.close() return db @pytest.fixture(autouse=True) def mock_embedding_model(): """Stellt sicher, dass EMBEDDING_MODEL_READ=qwen-embedding-v4 fuer Tests.""" with patch("app.embeddings.EMBEDDING_MODEL_READ", "qwen-embedding-v4"): yield # ───────────────────────────────────────────────────────────────────────────── # find_anträge_for_news # ───────────────────────────────────────────────────────────────────────────── class TestFindAnträgeForNews: def test_wohnungsbau_news_matches_wohnungsbau_antrag(self, populated_db): result = find_anträge_for_news( "https://example.com/n1", db_path=populated_db, min_similarity=0.5, ) assert len(result) >= 1 # Top-Match sollte 18/A sein assert result[0]["drucksache"] == "18/A" assert result[0]["similarity"] > 0.9 def test_klima_news_matches_klima_antrag(self, populated_db): result = find_anträge_for_news( "https://example.com/n2", db_path=populated_db, min_similarity=0.5, ) assert len(result) >= 1 assert result[0]["drucksache"] == "18/B" def test_min_similarity_filters_orthogonal(self, populated_db): """Mit hohem min_similarity-Cutoff darf kein orthogonaler Antrag drin sein.""" result = find_anträge_for_news( "https://example.com/n1", db_path=populated_db, min_similarity=0.9, ) druck = [r["drucksache"] for r in result] assert "18/C" not in druck # 18/C ist orthogonal zu allem def test_unknown_news_returns_empty(self, populated_db): assert find_anträge_for_news( "https://example.com/missing", db_path=populated_db, ) == [] def test_empty_db(self, tmp_path): assert find_anträge_for_news( "x", db_path=tmp_path / "missing.db", ) == [] # ───────────────────────────────────────────────────────────────────────────── # find_news_for_antrag # ───────────────────────────────────────────────────────────────────────────── class TestFindNewsForAntrag: def test_wohnungsbau_antrag_matches_wohnungsbau_news(self, populated_db): result = find_news_for_antrag( "18/A", db_path=populated_db, min_similarity=0.5, ) assert len(result) >= 1 assert result[0]["url"] == "https://example.com/n1" def test_old_news_filtered_out(self, populated_db): """News aus dem 200-Tage-alten Bucket darf nicht im 90-Tage-Fenster auftauchen.""" result = find_news_for_antrag( "18/A", db_path=populated_db, min_similarity=0.0, days_window=90, ) urls = [r["url"] for r in result] assert "https://example.com/n3" not in urls def test_top_k_limits(self, populated_db): """top_k=1 liefert nur den besten Match.""" result = find_news_for_antrag( "18/A", db_path=populated_db, min_similarity=0.0, top_k=1, ) assert len(result) <= 1 def test_unknown_antrag(self, populated_db): assert find_news_for_antrag( "99/Missing", db_path=populated_db, ) == [] # ───────────────────────────────────────────────────────────────────────────── # aggregate_top_themen # ───────────────────────────────────────────────────────────────────────────── class TestAggregateTopThemen: def test_returns_buckets(self, populated_db): result = aggregate_top_themen( db_path=populated_db, min_similarity=0.5, ) # Heute gibt es 2 News-Artikel, beide mit Match assert len(result["buckets"]) == 2 assert "n_total_news" in result def test_each_bucket_has_news_and_matches(self, populated_db): result = aggregate_top_themen( db_path=populated_db, min_similarity=0.5, ) for b in result["buckets"]: assert "news" in b assert "matches" in b assert "url" in b["news"] assert "titel" in b["news"] def test_days_window_filter(self, populated_db): """Mit kleinem Fenster nur die fresh News, alte raus.""" result = aggregate_top_themen( db_path=populated_db, days_window=7, min_similarity=0.5, ) for b in result["buckets"]: assert b["news"]["url"] != "https://example.com/n3" def test_min_similarity_filter(self, populated_db): """Mit hohem min_sim verschwinden Cross-Matches.""" result = aggregate_top_themen( db_path=populated_db, min_similarity=0.99, ) # Nur exakte Matches sollten überleben for b in result["buckets"]: for m in b["matches"]: assert m["similarity"] > 0.99 # ───────────────────────────────────────────────────────────────────────────── # aggregate_themen_zeitreihe # ───────────────────────────────────────────────────────────────────────────── class TestComputeRelevance: def test_empty_returns_none_level(self): r = compute_relevance([]) assert r["level"] == "none" assert r["score"] == 0.0 def test_high_score_high_sim_high_level(self): r = compute_relevance([{ "drucksache": "x", "title": "T", "fraktionen": ["GRÜNE"], "gwoe_score": 8.0, "similarity": 0.6, }]) # 8.0 × 0.6 = 4.8 → high assert r["level"] == "high" assert r["score"] == 4.8 assert "GWÖ-8.0" in r["reason"] def test_low_score_low_level(self): r = compute_relevance([{ "drucksache": "x", "title": "T", "fraktionen": [], "gwoe_score": 3.0, "similarity": 0.5, }]) # 3.0 × 0.5 = 1.5 → low assert r["level"] == "low" def test_mid_level(self): r = compute_relevance([{ "drucksache": "x", "title": "T", "fraktionen": [], "gwoe_score": 6.0, "similarity": 0.5, }]) # 6.0 × 0.5 = 3.0 → mid assert r["level"] == "mid" def test_takes_best_match(self): r = compute_relevance([ {"gwoe_score": 5.0, "similarity": 0.4, "title": "Schwach", "fraktionen": []}, {"gwoe_score": 9.0, "similarity": 0.55, "title": "Stark", "fraktionen": []}, ]) # max(2.0, 4.95) = 4.95 → high assert r["score"] == 4.95 assert "Stark" in r["reason"] class TestAggregateZeitreihe: def test_structure(self, populated_db): result = aggregate_themen_zeitreihe(db_path=populated_db, days_window=7) assert "buckets" in result assert "sources" in result assert "series" in result def test_only_recent(self, populated_db): """Mit days_window=7 darf das alte News nicht im Bucket auftauchen.""" result = aggregate_themen_zeitreihe(db_path=populated_db, days_window=7) # Nur heutige News (n1, n2) — n3 ist 200 Tage alt total = sum(sum(s) for s in result["series"].values()) assert total == 2 def test_series_aligned(self, populated_db): """Pro Source: series-Liste muss exakt so lang sein wie buckets.""" result = aggregate_themen_zeitreihe(db_path=populated_db, days_window=7) for source in result["sources"]: assert len(result["series"][source]) == len(result["buckets"]) # ───────────────────────────────────────────────────────────────────────────── # aggregate_top_themen mit Relevance + only_relevant Filter # ───────────────────────────────────────────────────────────────────────────── class TestRelevanceInTopThemen: def test_each_bucket_has_relevance(self, populated_db): result = aggregate_top_themen(db_path=populated_db, min_similarity=0.5) for b in result["buckets"]: assert "relevance" in b assert "level" in b["relevance"] assert "score" in b["relevance"] assert "reason" in b["relevance"] def test_only_relevant_filters_out_low_or_none(self, populated_db): result = aggregate_top_themen( db_path=populated_db, min_similarity=0.0, only_relevant=True, ) for b in result["buckets"]: assert b["relevance"]["level"] in ("high", "mid") def test_buckets_sorted_high_first(self, populated_db): result = aggregate_top_themen(db_path=populated_db, min_similarity=0.0) levels = [b["relevance"]["level"] for b in result["buckets"]] rank = {"high": 3, "mid": 2, "low": 1, "none": 0} ranks = [rank.get(l, 0) for l in levels] # Reihenfolge muss monoton fallen assert ranks == sorted(ranks, reverse=True) # ───────────────────────────────────────────────────────────────────────────── # aggregate_news_cluster # ───────────────────────────────────────────────────────────────────────────── class TestNewsCluster: def test_structure(self, populated_db): # Mit hoeherem intra_threshold und kleinerem min_cluster_size # auf der Test-DB: orthogonale News bilden keine Cluster result = aggregate_news_cluster( db_path=populated_db, min_cluster_size=2, intra_threshold=0.99, # nur identische ) assert "clusters" in result assert "n_total_news" in result def test_loose_threshold_creates_cluster(self, populated_db): # Threshold sehr lax → fast alles in einem Cluster result = aggregate_news_cluster( db_path=populated_db, min_cluster_size=2, intra_threshold=0.0, days_window=30, ) # Mindestens ein Cluster mit >=2 Members assert len(result["clusters"]) >= 0 for c in result["clusters"]: assert c["size"] >= 2 assert "members" in c assert "antrag_matches" in c assert "top_tags" in c def test_min_cluster_size_filter(self, populated_db): result = aggregate_news_cluster( db_path=populated_db, min_cluster_size=5, ) # Nur 3 News in der DB → nichts erreicht size>=5 assert result["clusters"] == [] # ───────────────────────────────────────────────────────────────────────────── # aggregate_top_antraege_with_news # ───────────────────────────────────────────────────────────────────────────── class TestTopAntraegeWithNews: def test_only_high_gwoe(self, populated_db): """Nur Antraege mit gwoe_score >= min_gwoe_score auftauchen.""" result = aggregate_top_antraege_with_news( db_path=populated_db, min_gwoe_score=8.0, ) for a in result["antraege"]: assert a["gwoe_score"] >= 8.0 # 18/A hat 8.0, 18/B hat 7.0, 18/C hat 5.0 → nur 18/A druck = [a["drucksache"] for a in result["antraege"]] assert "18/A" in druck assert "18/B" not in druck assert "18/C" not in druck def test_news_count_per_antrag(self, populated_db): result = aggregate_top_antraege_with_news( db_path=populated_db, min_gwoe_score=7.0, min_similarity=0.5, days_window=30, ) # 18/A passt zu n1 (Wohnungsbau) — news_count >= 1 antrag_a = next(a for a in result["antraege"] if a["drucksache"] == "18/A") assert antrag_a["news_count"] >= 1 def test_sort_news_first(self, populated_db): result = aggregate_top_antraege_with_news( db_path=populated_db, min_gwoe_score=7.0, min_similarity=0.5, days_window=30, ) # Antraege mit news_count > 0 sollten vor denen ohne stehen last_with_news = -1 first_without = len(result["antraege"]) for i, a in enumerate(result["antraege"]): if a["news_count"] > 0: last_with_news = i elif first_without == len(result["antraege"]): first_without = i assert last_with_news < first_without # ───────────────────────────────────────────────────────────────────────────── # TTL-Cache (Performance #170 followup) # ───────────────────────────────────────────────────────────────────────────── class TestPerformanceCache: def test_top_themen_cache_hit_returns_same_object(self, populated_db): """Zweiter Call mit gleichen Args sollte den gleichen dict liefern.""" cache_clear() a = aggregate_top_themen(db_path=populated_db, min_similarity=0.5) b = aggregate_top_themen(db_path=populated_db, min_similarity=0.5) # Cache liefert dasselbe Objekt (identity check) assert a is b def test_top_themen_cache_miss_different_args(self, populated_db): """Andere Args → neuer Eintrag, anderer dict.""" cache_clear() a = aggregate_top_themen(db_path=populated_db, min_similarity=0.5) b = aggregate_top_themen(db_path=populated_db, min_similarity=0.6) # Different filter values → different cache-keys assert a is not b def test_cache_clear_invalidates(self, populated_db): cache_clear() a = aggregate_top_themen(db_path=populated_db, min_similarity=0.5) cache_clear() b = aggregate_top_themen(db_path=populated_db, min_similarity=0.5) # Nach clear: neuer Aufruf gibt neues Objekt zurueck assert a is not b # Inhaltlich identisch assert len(a["buckets"]) == len(b["buckets"]) def test_cluster_cached_too(self, populated_db): cache_clear() a = aggregate_news_cluster(db_path=populated_db, min_cluster_size=1) b = aggregate_news_cluster(db_path=populated_db, min_cluster_size=1) assert a is b