"""Unit-Tests für app/clustering.py (#134 Phase 2). Testet reine Python-Funktionen (_cosine, UnionFind, _cluster_indices, _cluster_label, _dominant_fraktion, _cluster_summary) mit synthetischen Fixtures. DB-abhängige async-Funktionen (load_assessment_items, build_hierarchy, find_similar_assessments) werden mit gemocktem DB-Lader getestet. Fixture-Corpus: normalisierte Vektoren per Pure-Python (kein numpy nötig). """ from __future__ import annotations import asyncio import math import random from unittest.mock import patch import pytest # ─── Hilfsfunktionen ───────────────────────────────────────────────────────── def run(coro): return asyncio.get_event_loop().run_until_complete(coro) def _norm_py(v: list[float]) -> list[float]: """Normalisiert einen Vektor auf Länge 1 (pure Python).""" n = math.sqrt(sum(x * x for x in v)) return [x / n for x in v] if n > 0 else v def _make_items(n: int = 5, dim: int = 16, seed: int = 42) -> list[dict]: """Erstellt n normalisierte Embedding-Dicts mit reproduzierbaren Zufallswerten.""" rng = random.Random(seed) items = [] for i in range(n): raw = [rng.gauss(0, 1) for _ in range(dim)] items.append({ "drucksache": f"18/{1000 + i}", "title": f"Testantrag {i}", "bundesland": "NRW", "fraktionen": ["SPD"] if i % 2 == 0 else ["CDU"], "datum": "2026-04-20", "link": f"https://example.com/{i}", "gwoe_score": 5.0 + i * 0.5, "empfehlung": "Empfohlen", "empfehlung_symbol": "✓", "themen": [f"Thema{i % 3}"], "embedding": _norm_py(raw), }) return items # ─── _cosine ───────────────────────────────────────────────────────────────── class TestCosine: def test_identical_vectors_give_one(self): from app.clustering import _cosine v = [1.0, 0.0, 0.0] assert abs(_cosine(v, v) - 1.0) < 1e-9 def test_orthogonal_vectors_give_zero(self): from app.clustering import _cosine a = [1.0, 0.0] b = [0.0, 1.0] assert abs(_cosine(a, b)) < 1e-9 def test_opposite_vectors_give_minus_one(self): from app.clustering import _cosine a = [1.0, 0.0] b = [-1.0, 0.0] assert abs(_cosine(a, b) + 1.0) < 1e-9 def test_zero_vector_returns_zero(self): from app.clustering import _cosine assert _cosine([0.0, 0.0], [1.0, 0.0]) == 0.0 def test_symmetry(self): from app.clustering import _cosine a = [0.6, 0.8] b = [0.8, 0.6] assert abs(_cosine(a, b) - _cosine(b, a)) < 1e-12 def test_range_normalized_vectors(self): from app.clustering import _cosine rng = random.Random(1) for _ in range(10): a = _norm_py([rng.gauss(0, 1) for _ in range(8)]) b = _norm_py([rng.gauss(0, 1) for _ in range(8)]) sim = _cosine(a, b) assert -1.0 - 1e-9 <= sim <= 1.0 + 1e-9 # ─── UnionFind ──────────────────────────────────────────────────────────────── class TestUnionFind: def test_initial_all_separate(self): from app.clustering import UnionFind uf = UnionFind(4) assert len({uf.find(i) for i in range(4)}) == 4 def test_union_merges_components(self): from app.clustering import UnionFind uf = UnionFind(4) uf.union(0, 1) uf.union(2, 3) assert uf.find(0) == uf.find(1) assert uf.find(2) == uf.find(3) assert uf.find(0) != uf.find(2) def test_union_find_path_compression(self): from app.clustering import UnionFind uf = UnionFind(5) uf.union(0, 1) uf.union(1, 2) uf.union(2, 3) uf.union(3, 4) root = uf.find(0) assert all(uf.find(i) == root for i in range(5)) def test_union_self_no_error(self): from app.clustering import UnionFind uf = UnionFind(3) uf.union(1, 1) assert uf.find(1) == uf.find(1) def test_empty_union_find(self): from app.clustering import UnionFind uf = UnionFind(0) assert uf.parent == [] # ─── _cluster_indices ──────────────────────────────────────────────────────── class TestClusterIndices: def test_empty_corpus_returns_empty(self): from app.clustering import _cluster_indices assert _cluster_indices([], 0.5) == [] def test_single_item_is_singleton(self): from app.clustering import _cluster_indices items = _make_items(1) groups = _cluster_indices(items, 0.5) assert len(groups) == 1 assert len(groups[0]) == 1 def test_all_identical_items_one_cluster(self): from app.clustering import _cosine, _cluster_indices # Alle denselben Vektor → kosinus = 1.0 → alle in einem Cluster v = [1.0, 0.0, 0.0] items = [ {**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v} for i in range(4) ] groups = _cluster_indices(items, 0.5) assert len(groups) == 1 assert len(groups[0]) == 4 def test_orthogonal_items_all_singletons(self): """Orthogonale Einheitsvektoren → kosinus=0 → alle Singletons.""" from app.clustering import _cluster_indices identity_vecs = [[1 if i == j else 0 for j in range(4)] for i in range(4)] items = [ {**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v} for i, v in enumerate(identity_vecs) ] groups = _cluster_indices(items, 0.5) # Alle Gruppen sind Singletons assert all(len(g) == 1 for g in groups) def test_higher_threshold_fewer_clusters(self): """Höherer Threshold → mehr Singletons, weniger große Cluster.""" from app.clustering import _cluster_indices items = _make_items(8, seed=99) groups_low = _cluster_indices(items, 0.1) groups_high = _cluster_indices(items, 0.99) # Bei low threshold: mind. eine Gruppe > 1 möglich # Bei high threshold (0.99): fast alle Singletons singleton_low = sum(1 for g in groups_low if len(g) == 1) singleton_high = sum(1 for g in groups_high if len(g) == 1) assert singleton_high >= singleton_low def test_sorted_by_size_descending(self): from app.clustering import _cluster_indices v = [1.0, 0.0] items = [ {**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v} for i in range(3) ] + [ {**_make_items(1)[0], "drucksache": "18/solo", "embedding": [0.0, 1.0]} ] groups = _cluster_indices(items, 0.5) sizes = [len(g) for g in groups] assert sizes == sorted(sizes, reverse=True) # ─── _dominant_fraktion ─────────────────────────────────────────────────────── class TestDominantFraktion: def test_majority_fraktion_wins(self): from app.clustering import _dominant_fraktion items = [ {"fraktionen": ["SPD"]}, {"fraktionen": ["SPD"]}, {"fraktionen": ["CDU"]}, ] assert _dominant_fraktion(items) == "SPD" def test_empty_items_returns_none(self): from app.clustering import _dominant_fraktion assert _dominant_fraktion([]) is None def test_empty_fraktionen_lists_returns_none(self): from app.clustering import _dominant_fraktion items = [{"fraktionen": []}, {"fraktionen": None}] assert _dominant_fraktion(items) is None # ─── _cluster_label ─────────────────────────────────────────────────────────── class TestClusterLabel: def test_top_theme_used_as_label(self): from app.clustering import _cluster_label items = [ {"themen": ["Klimaschutz", "Energie"], "title": "A"}, {"themen": ["Klimaschutz"], "title": "B"}, ] label = _cluster_label(items) assert "Klimaschutz" in label def test_fallback_to_shortest_title(self): from app.clustering import _cluster_label items = [ {"themen": [], "title": "Kurz"}, {"themen": [], "title": "Sehr langer Titel"}, ] label = _cluster_label(items) assert label == "Kurz" def test_fallback_cluster_label(self): from app.clustering import _cluster_label items = [{"themen": [], "title": None}] label = _cluster_label(items) assert label == "Cluster" # ─── _cluster_summary ──────────────────────────────────────────────────────── class TestClusterSummary: def test_basic_fields_present(self): from app.clustering import _cluster_summary items = _make_items(3) summary = _cluster_summary(items) for key in ("size", "label", "dominant_fraktion", "avg_gwoe_score", "drucksachen"): assert key in summary def test_size_correct(self): from app.clustering import _cluster_summary items = _make_items(4) summary = _cluster_summary(items) assert summary["size"] == 4 def test_avg_score_calculated(self): from app.clustering import _cluster_summary items = [ {**_make_items(1)[0], "gwoe_score": 4.0}, {**_make_items(1)[0], "gwoe_score": 6.0}, ] summary = _cluster_summary(items) assert summary["avg_gwoe_score"] == 5.0 def test_include_edges_adds_nodes_and_edges(self): from app.clustering import _cluster_summary items = _make_items(3) summary = _cluster_summary(items, include_edges=True) assert "nodes" in summary assert "edges" in summary assert len(summary["nodes"]) == 3 # 3 Knoten → 3 Kanten (0-1, 0-2, 1-2) assert len(summary["edges"]) == 3 def test_no_edges_without_flag(self): from app.clustering import _cluster_summary items = _make_items(3) summary = _cluster_summary(items, include_edges=False) assert "edges" not in summary assert "nodes" not in summary # ─── build_hierarchy (async, DB gemockt) ───────────────────────────────────── class TestBuildHierarchy: def test_empty_corpus_structure(self): """Leerer Corpus → korrekte Grundstruktur.""" from app import clustering async def fake_load(bundesland=None): return [] with patch.object(clustering, "load_assessment_items", side_effect=fake_load): result = run(clustering.build_hierarchy()) assert result["meta"]["total"] == 0 assert result["clusters"] == [] assert result["singletons"] == [] def test_single_item_becomes_singleton(self): from app import clustering items = _make_items(1) async def fake_load(bundesland=None): return items with patch.object(clustering, "load_assessment_items", side_effect=fake_load): result = run(clustering.build_hierarchy(threshold=0.5)) assert len(result["singletons"]) == 1 assert result["clusters"] == [] def test_meta_fields_present(self): from app import clustering items = _make_items(4) async def fake_load(bundesland=None): return items with patch.object(clustering, "load_assessment_items", side_effect=fake_load): result = run(clustering.build_hierarchy()) meta = result["meta"] for key in ("total", "threshold", "num_clusters", "num_singletons"): assert key in meta def test_threshold_affects_cluster_count(self): """Niedrigerer Threshold → mehr Kanten → potenziell mehr gebündelte Items.""" from app import clustering # Identische Items → immer ein Cluster bei jedem Threshold < 1.0 v = [1.0, 0.0, 0.0] items = [ {**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v} for i in range(3) ] async def fake_load(bundesland=None): return items with patch.object(clustering, "load_assessment_items", side_effect=fake_load): result = run(clustering.build_hierarchy(threshold=0.5)) assert len(result["clusters"]) == 1 assert result["clusters"][0]["size"] == 3 # ─── find_similar_assessments (async, DB gemockt) ──────────────────────────── class TestFindSimilarAssessments: def test_returns_empty_for_unknown_drucksache(self): from app import clustering items = _make_items(3) async def fake_load(bundesland=None): return items with patch.object(clustering, "load_assessment_items", side_effect=fake_load): result = run(clustering.find_similar_assessments("99/9999")) assert result == [] def test_returns_top_k_results(self): from app import clustering items = _make_items(5) target_id = items[0]["drucksache"] async def fake_load(bundesland=None): return items with patch.object(clustering, "load_assessment_items", side_effect=fake_load): result = run(clustering.find_similar_assessments(target_id, top_k=3)) assert len(result) == 3 def test_excludes_self(self): from app import clustering items = _make_items(5) target_id = items[0]["drucksache"] async def fake_load(bundesland=None): return items with patch.object(clustering, "load_assessment_items", side_effect=fake_load): result = run(clustering.find_similar_assessments(target_id, top_k=10)) drucksachen = [r["drucksache"] for r in result] assert target_id not in drucksachen def test_result_sorted_by_similarity_descending(self): from app import clustering items = _make_items(5) target_id = items[0]["drucksache"] async def fake_load(bundesland=None): return items with patch.object(clustering, "load_assessment_items", side_effect=fake_load): result = run(clustering.find_similar_assessments(target_id, top_k=4)) sims = [r["similarity"] for r in result] assert sims == sorted(sims, reverse=True) def test_result_fields_present(self): from app import clustering items = _make_items(3) target_id = items[0]["drucksache"] async def fake_load(bundesland=None): return items with patch.object(clustering, "load_assessment_items", side_effect=fake_load): result = run(clustering.find_similar_assessments(target_id, top_k=2)) for r in result: for key in ("drucksache", "title", "bundesland", "fraktionen", "gwoe_score", "empfehlung", "similarity"): assert key in r def test_single_item_corpus_returns_empty(self): """Nur ein Item im Corpus → nach Selbst-Ausschluss kein Ergebnis.""" from app import clustering items = _make_items(1) async def fake_load(bundesland=None): return items with patch.object(clustering, "load_assessment_items", side_effect=fake_load): result = run(clustering.find_similar_assessments(items[0]["drucksache"])) assert result == []