gwoe-antragspruefer/tests/test_clustering.py

439 lines
16 KiB
Python
Raw Normal View History

"""Unit-Tests für app/clustering.py (#134 Phase 2).
Testet reine Python-Funktionen (_cosine, UnionFind, _cluster_indices,
_cluster_label, _dominant_fraktion, _cluster_summary) mit synthetischen
Fixtures. DB-abhängige async-Funktionen (load_assessment_items,
build_hierarchy, find_similar_assessments) werden mit gemocktem DB-Lader
getestet.
Fixture-Corpus: normalisierte Vektoren per Pure-Python (kein numpy nötig).
"""
from __future__ import annotations
import asyncio
import math
import random
from unittest.mock import patch
import pytest
# ─── Hilfsfunktionen ─────────────────────────────────────────────────────────
def run(coro):
return asyncio.get_event_loop().run_until_complete(coro)
def _norm_py(v: list[float]) -> list[float]:
"""Normalisiert einen Vektor auf Länge 1 (pure Python)."""
n = math.sqrt(sum(x * x for x in v))
return [x / n for x in v] if n > 0 else v
def _make_items(n: int = 5, dim: int = 16, seed: int = 42) -> list[dict]:
"""Erstellt n normalisierte Embedding-Dicts mit reproduzierbaren Zufallswerten."""
rng = random.Random(seed)
items = []
for i in range(n):
raw = [rng.gauss(0, 1) for _ in range(dim)]
items.append({
"drucksache": f"18/{1000 + i}",
"title": f"Testantrag {i}",
"bundesland": "NRW",
"fraktionen": ["SPD"] if i % 2 == 0 else ["CDU"],
"datum": "2026-04-20",
"link": f"https://example.com/{i}",
"gwoe_score": 5.0 + i * 0.5,
"empfehlung": "Empfohlen",
"empfehlung_symbol": "",
"themen": [f"Thema{i % 3}"],
"embedding": _norm_py(raw),
})
return items
# ─── _cosine ─────────────────────────────────────────────────────────────────
class TestCosine:
def test_identical_vectors_give_one(self):
from app.clustering import _cosine
v = [1.0, 0.0, 0.0]
assert abs(_cosine(v, v) - 1.0) < 1e-9
def test_orthogonal_vectors_give_zero(self):
from app.clustering import _cosine
a = [1.0, 0.0]
b = [0.0, 1.0]
assert abs(_cosine(a, b)) < 1e-9
def test_opposite_vectors_give_minus_one(self):
from app.clustering import _cosine
a = [1.0, 0.0]
b = [-1.0, 0.0]
assert abs(_cosine(a, b) + 1.0) < 1e-9
def test_zero_vector_returns_zero(self):
from app.clustering import _cosine
assert _cosine([0.0, 0.0], [1.0, 0.0]) == 0.0
def test_symmetry(self):
from app.clustering import _cosine
a = [0.6, 0.8]
b = [0.8, 0.6]
assert abs(_cosine(a, b) - _cosine(b, a)) < 1e-12
def test_range_normalized_vectors(self):
from app.clustering import _cosine
rng = random.Random(1)
for _ in range(10):
a = _norm_py([rng.gauss(0, 1) for _ in range(8)])
b = _norm_py([rng.gauss(0, 1) for _ in range(8)])
sim = _cosine(a, b)
assert -1.0 - 1e-9 <= sim <= 1.0 + 1e-9
# ─── UnionFind ────────────────────────────────────────────────────────────────
class TestUnionFind:
def test_initial_all_separate(self):
from app.clustering import UnionFind
uf = UnionFind(4)
assert len({uf.find(i) for i in range(4)}) == 4
def test_union_merges_components(self):
from app.clustering import UnionFind
uf = UnionFind(4)
uf.union(0, 1)
uf.union(2, 3)
assert uf.find(0) == uf.find(1)
assert uf.find(2) == uf.find(3)
assert uf.find(0) != uf.find(2)
def test_union_find_path_compression(self):
from app.clustering import UnionFind
uf = UnionFind(5)
uf.union(0, 1)
uf.union(1, 2)
uf.union(2, 3)
uf.union(3, 4)
root = uf.find(0)
assert all(uf.find(i) == root for i in range(5))
def test_union_self_no_error(self):
from app.clustering import UnionFind
uf = UnionFind(3)
uf.union(1, 1)
assert uf.find(1) == uf.find(1)
def test_empty_union_find(self):
from app.clustering import UnionFind
uf = UnionFind(0)
assert uf.parent == []
# ─── _cluster_indices ────────────────────────────────────────────────────────
class TestClusterIndices:
def test_empty_corpus_returns_empty(self):
from app.clustering import _cluster_indices
assert _cluster_indices([], 0.5) == []
def test_single_item_is_singleton(self):
from app.clustering import _cluster_indices
items = _make_items(1)
groups = _cluster_indices(items, 0.5)
assert len(groups) == 1
assert len(groups[0]) == 1
def test_all_identical_items_one_cluster(self):
from app.clustering import _cosine, _cluster_indices
# Alle denselben Vektor → kosinus = 1.0 → alle in einem Cluster
v = [1.0, 0.0, 0.0]
items = [
{**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v}
for i in range(4)
]
groups = _cluster_indices(items, 0.5)
assert len(groups) == 1
assert len(groups[0]) == 4
def test_orthogonal_items_all_singletons(self):
"""Orthogonale Einheitsvektoren → kosinus=0 → alle Singletons."""
from app.clustering import _cluster_indices
identity_vecs = [[1 if i == j else 0 for j in range(4)] for i in range(4)]
items = [
{**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v}
for i, v in enumerate(identity_vecs)
]
groups = _cluster_indices(items, 0.5)
# Alle Gruppen sind Singletons
assert all(len(g) == 1 for g in groups)
def test_higher_threshold_fewer_clusters(self):
"""Höherer Threshold → mehr Singletons, weniger große Cluster."""
from app.clustering import _cluster_indices
items = _make_items(8, seed=99)
groups_low = _cluster_indices(items, 0.1)
groups_high = _cluster_indices(items, 0.99)
# Bei low threshold: mind. eine Gruppe > 1 möglich
# Bei high threshold (0.99): fast alle Singletons
singleton_low = sum(1 for g in groups_low if len(g) == 1)
singleton_high = sum(1 for g in groups_high if len(g) == 1)
assert singleton_high >= singleton_low
def test_sorted_by_size_descending(self):
from app.clustering import _cluster_indices
v = [1.0, 0.0]
items = [
{**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v}
for i in range(3)
] + [
{**_make_items(1)[0], "drucksache": "18/solo", "embedding": [0.0, 1.0]}
]
groups = _cluster_indices(items, 0.5)
sizes = [len(g) for g in groups]
assert sizes == sorted(sizes, reverse=True)
# ─── _dominant_fraktion ───────────────────────────────────────────────────────
class TestDominantFraktion:
def test_majority_fraktion_wins(self):
from app.clustering import _dominant_fraktion
items = [
{"fraktionen": ["SPD"]},
{"fraktionen": ["SPD"]},
{"fraktionen": ["CDU"]},
]
assert _dominant_fraktion(items) == "SPD"
def test_empty_items_returns_none(self):
from app.clustering import _dominant_fraktion
assert _dominant_fraktion([]) is None
def test_empty_fraktionen_lists_returns_none(self):
from app.clustering import _dominant_fraktion
items = [{"fraktionen": []}, {"fraktionen": None}]
assert _dominant_fraktion(items) is None
# ─── _cluster_label ───────────────────────────────────────────────────────────
class TestClusterLabel:
def test_top_theme_used_as_label(self):
from app.clustering import _cluster_label
items = [
{"themen": ["Klimaschutz", "Energie"], "title": "A"},
{"themen": ["Klimaschutz"], "title": "B"},
]
label = _cluster_label(items)
assert "Klimaschutz" in label
def test_fallback_to_shortest_title(self):
from app.clustering import _cluster_label
items = [
{"themen": [], "title": "Kurz"},
{"themen": [], "title": "Sehr langer Titel"},
]
label = _cluster_label(items)
assert label == "Kurz"
def test_fallback_cluster_label(self):
from app.clustering import _cluster_label
items = [{"themen": [], "title": None}]
label = _cluster_label(items)
assert label == "Cluster"
# ─── _cluster_summary ────────────────────────────────────────────────────────
class TestClusterSummary:
def test_basic_fields_present(self):
from app.clustering import _cluster_summary
items = _make_items(3)
summary = _cluster_summary(items)
for key in ("size", "label", "dominant_fraktion", "avg_gwoe_score", "drucksachen"):
assert key in summary
def test_size_correct(self):
from app.clustering import _cluster_summary
items = _make_items(4)
summary = _cluster_summary(items)
assert summary["size"] == 4
def test_avg_score_calculated(self):
from app.clustering import _cluster_summary
items = [
{**_make_items(1)[0], "gwoe_score": 4.0},
{**_make_items(1)[0], "gwoe_score": 6.0},
]
summary = _cluster_summary(items)
assert summary["avg_gwoe_score"] == 5.0
def test_include_edges_adds_nodes_and_edges(self):
from app.clustering import _cluster_summary
items = _make_items(3)
summary = _cluster_summary(items, include_edges=True)
assert "nodes" in summary
assert "edges" in summary
assert len(summary["nodes"]) == 3
# 3 Knoten → 3 Kanten (0-1, 0-2, 1-2)
assert len(summary["edges"]) == 3
def test_no_edges_without_flag(self):
from app.clustering import _cluster_summary
items = _make_items(3)
summary = _cluster_summary(items, include_edges=False)
assert "edges" not in summary
assert "nodes" not in summary
# ─── build_hierarchy (async, DB gemockt) ─────────────────────────────────────
class TestBuildHierarchy:
def test_empty_corpus_structure(self):
"""Leerer Corpus → korrekte Grundstruktur."""
from app import clustering
async def fake_load(bundesland=None):
return []
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
result = run(clustering.build_hierarchy())
assert result["meta"]["total"] == 0
assert result["clusters"] == []
assert result["singletons"] == []
def test_single_item_becomes_singleton(self):
from app import clustering
items = _make_items(1)
async def fake_load(bundesland=None):
return items
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
result = run(clustering.build_hierarchy(threshold=0.5))
assert len(result["singletons"]) == 1
assert result["clusters"] == []
def test_meta_fields_present(self):
from app import clustering
items = _make_items(4)
async def fake_load(bundesland=None):
return items
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
result = run(clustering.build_hierarchy())
meta = result["meta"]
for key in ("total", "threshold", "num_clusters", "num_singletons"):
assert key in meta
def test_threshold_affects_cluster_count(self):
"""Niedrigerer Threshold → mehr Kanten → potenziell mehr gebündelte Items."""
from app import clustering
# Identische Items → immer ein Cluster bei jedem Threshold < 1.0
v = [1.0, 0.0, 0.0]
items = [
{**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v}
for i in range(3)
]
async def fake_load(bundesland=None):
return items
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
result = run(clustering.build_hierarchy(threshold=0.5))
assert len(result["clusters"]) == 1
assert result["clusters"][0]["size"] == 3
# ─── find_similar_assessments (async, DB gemockt) ────────────────────────────
class TestFindSimilarAssessments:
def test_returns_empty_for_unknown_drucksache(self):
from app import clustering
items = _make_items(3)
async def fake_load(bundesland=None):
return items
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
result = run(clustering.find_similar_assessments("99/9999"))
assert result == []
def test_returns_top_k_results(self):
from app import clustering
items = _make_items(5)
target_id = items[0]["drucksache"]
async def fake_load(bundesland=None):
return items
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
result = run(clustering.find_similar_assessments(target_id, top_k=3))
assert len(result) == 3
def test_excludes_self(self):
from app import clustering
items = _make_items(5)
target_id = items[0]["drucksache"]
async def fake_load(bundesland=None):
return items
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
result = run(clustering.find_similar_assessments(target_id, top_k=10))
drucksachen = [r["drucksache"] for r in result]
assert target_id not in drucksachen
def test_result_sorted_by_similarity_descending(self):
from app import clustering
items = _make_items(5)
target_id = items[0]["drucksache"]
async def fake_load(bundesland=None):
return items
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
result = run(clustering.find_similar_assessments(target_id, top_k=4))
sims = [r["similarity"] for r in result]
assert sims == sorted(sims, reverse=True)
def test_result_fields_present(self):
from app import clustering
items = _make_items(3)
target_id = items[0]["drucksache"]
async def fake_load(bundesland=None):
return items
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
result = run(clustering.find_similar_assessments(target_id, top_k=2))
for r in result:
for key in ("drucksache", "title", "bundesland", "fraktionen",
"gwoe_score", "empfehlung", "similarity"):
assert key in r
def test_single_item_corpus_returns_empty(self):
"""Nur ein Item im Corpus → nach Selbst-Ausschluss kein Ergebnis."""
from app import clustering
items = _make_items(1)
async def fake_load(bundesland=None):
return items
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
result = run(clustering.find_similar_assessments(items[0]["drucksache"]))
assert result == []