Neue Tests in dieser Migration: - test_database.py (Merkliste-CRUD, Subscriptions, abgeordnetenwatch-Joins) - test_clustering.py (82% Coverage) - test_drucksache_typen.py (100%) - test_mail.py (86%) - test_monitoring.py (23 Tests) - test_abgeordnetenwatch.py (23 Tests, inkl. Drucksache-Extraction) - test_redline_parser.py (20 Tests fuer §INS§/§DEL§-Marker) - test_bug_regressions.py (PRAGMA, JWT-azp, CDU-PDF, PFLICHT-FRAKTIONEN, NRW-Titel) - test_embeddings_v3_v4.py (WRITE/READ-Pattern) - test_wahlprogramm_check.py (#128) - test_wahlprogramm_fetch.py (#138) - test_antrag/bewertung/abonnement_repository.py + test_llm_bewerter.py (DDD) - test_domain_behavior.py (5 Domain-Methoden boundary tests) - tests/e2e/test_ui.py (Playwright) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
439 lines
16 KiB
Python
439 lines
16 KiB
Python
"""Unit-Tests für app/clustering.py (#134 Phase 2).
|
|
|
|
Testet reine Python-Funktionen (_cosine, UnionFind, _cluster_indices,
|
|
_cluster_label, _dominant_fraktion, _cluster_summary) mit synthetischen
|
|
Fixtures. DB-abhängige async-Funktionen (load_assessment_items,
|
|
build_hierarchy, find_similar_assessments) werden mit gemocktem DB-Lader
|
|
getestet.
|
|
|
|
Fixture-Corpus: normalisierte Vektoren per Pure-Python (kein numpy nötig).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import math
|
|
import random
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
|
|
# ─── Hilfsfunktionen ─────────────────────────────────────────────────────────
|
|
|
|
def run(coro):
|
|
return asyncio.get_event_loop().run_until_complete(coro)
|
|
|
|
|
|
def _norm_py(v: list[float]) -> list[float]:
|
|
"""Normalisiert einen Vektor auf Länge 1 (pure Python)."""
|
|
n = math.sqrt(sum(x * x for x in v))
|
|
return [x / n for x in v] if n > 0 else v
|
|
|
|
|
|
def _make_items(n: int = 5, dim: int = 16, seed: int = 42) -> list[dict]:
|
|
"""Erstellt n normalisierte Embedding-Dicts mit reproduzierbaren Zufallswerten."""
|
|
rng = random.Random(seed)
|
|
items = []
|
|
for i in range(n):
|
|
raw = [rng.gauss(0, 1) for _ in range(dim)]
|
|
items.append({
|
|
"drucksache": f"18/{1000 + i}",
|
|
"title": f"Testantrag {i}",
|
|
"bundesland": "NRW",
|
|
"fraktionen": ["SPD"] if i % 2 == 0 else ["CDU"],
|
|
"datum": "2026-04-20",
|
|
"link": f"https://example.com/{i}",
|
|
"gwoe_score": 5.0 + i * 0.5,
|
|
"empfehlung": "Empfohlen",
|
|
"empfehlung_symbol": "✓",
|
|
"themen": [f"Thema{i % 3}"],
|
|
"embedding": _norm_py(raw),
|
|
})
|
|
return items
|
|
|
|
|
|
# ─── _cosine ─────────────────────────────────────────────────────────────────
|
|
|
|
class TestCosine:
|
|
def test_identical_vectors_give_one(self):
|
|
from app.clustering import _cosine
|
|
v = [1.0, 0.0, 0.0]
|
|
assert abs(_cosine(v, v) - 1.0) < 1e-9
|
|
|
|
def test_orthogonal_vectors_give_zero(self):
|
|
from app.clustering import _cosine
|
|
a = [1.0, 0.0]
|
|
b = [0.0, 1.0]
|
|
assert abs(_cosine(a, b)) < 1e-9
|
|
|
|
def test_opposite_vectors_give_minus_one(self):
|
|
from app.clustering import _cosine
|
|
a = [1.0, 0.0]
|
|
b = [-1.0, 0.0]
|
|
assert abs(_cosine(a, b) + 1.0) < 1e-9
|
|
|
|
def test_zero_vector_returns_zero(self):
|
|
from app.clustering import _cosine
|
|
assert _cosine([0.0, 0.0], [1.0, 0.0]) == 0.0
|
|
|
|
def test_symmetry(self):
|
|
from app.clustering import _cosine
|
|
a = [0.6, 0.8]
|
|
b = [0.8, 0.6]
|
|
assert abs(_cosine(a, b) - _cosine(b, a)) < 1e-12
|
|
|
|
def test_range_normalized_vectors(self):
|
|
from app.clustering import _cosine
|
|
rng = random.Random(1)
|
|
for _ in range(10):
|
|
a = _norm_py([rng.gauss(0, 1) for _ in range(8)])
|
|
b = _norm_py([rng.gauss(0, 1) for _ in range(8)])
|
|
sim = _cosine(a, b)
|
|
assert -1.0 - 1e-9 <= sim <= 1.0 + 1e-9
|
|
|
|
|
|
# ─── UnionFind ────────────────────────────────────────────────────────────────
|
|
|
|
class TestUnionFind:
|
|
def test_initial_all_separate(self):
|
|
from app.clustering import UnionFind
|
|
uf = UnionFind(4)
|
|
assert len({uf.find(i) for i in range(4)}) == 4
|
|
|
|
def test_union_merges_components(self):
|
|
from app.clustering import UnionFind
|
|
uf = UnionFind(4)
|
|
uf.union(0, 1)
|
|
uf.union(2, 3)
|
|
assert uf.find(0) == uf.find(1)
|
|
assert uf.find(2) == uf.find(3)
|
|
assert uf.find(0) != uf.find(2)
|
|
|
|
def test_union_find_path_compression(self):
|
|
from app.clustering import UnionFind
|
|
uf = UnionFind(5)
|
|
uf.union(0, 1)
|
|
uf.union(1, 2)
|
|
uf.union(2, 3)
|
|
uf.union(3, 4)
|
|
root = uf.find(0)
|
|
assert all(uf.find(i) == root for i in range(5))
|
|
|
|
def test_union_self_no_error(self):
|
|
from app.clustering import UnionFind
|
|
uf = UnionFind(3)
|
|
uf.union(1, 1)
|
|
assert uf.find(1) == uf.find(1)
|
|
|
|
def test_empty_union_find(self):
|
|
from app.clustering import UnionFind
|
|
uf = UnionFind(0)
|
|
assert uf.parent == []
|
|
|
|
|
|
# ─── _cluster_indices ────────────────────────────────────────────────────────
|
|
|
|
class TestClusterIndices:
|
|
def test_empty_corpus_returns_empty(self):
|
|
from app.clustering import _cluster_indices
|
|
assert _cluster_indices([], 0.5) == []
|
|
|
|
def test_single_item_is_singleton(self):
|
|
from app.clustering import _cluster_indices
|
|
items = _make_items(1)
|
|
groups = _cluster_indices(items, 0.5)
|
|
assert len(groups) == 1
|
|
assert len(groups[0]) == 1
|
|
|
|
def test_all_identical_items_one_cluster(self):
|
|
from app.clustering import _cosine, _cluster_indices
|
|
# Alle denselben Vektor → kosinus = 1.0 → alle in einem Cluster
|
|
v = [1.0, 0.0, 0.0]
|
|
items = [
|
|
{**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v}
|
|
for i in range(4)
|
|
]
|
|
groups = _cluster_indices(items, 0.5)
|
|
assert len(groups) == 1
|
|
assert len(groups[0]) == 4
|
|
|
|
def test_orthogonal_items_all_singletons(self):
|
|
"""Orthogonale Einheitsvektoren → kosinus=0 → alle Singletons."""
|
|
from app.clustering import _cluster_indices
|
|
identity_vecs = [[1 if i == j else 0 for j in range(4)] for i in range(4)]
|
|
items = [
|
|
{**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v}
|
|
for i, v in enumerate(identity_vecs)
|
|
]
|
|
groups = _cluster_indices(items, 0.5)
|
|
# Alle Gruppen sind Singletons
|
|
assert all(len(g) == 1 for g in groups)
|
|
|
|
def test_higher_threshold_fewer_clusters(self):
|
|
"""Höherer Threshold → mehr Singletons, weniger große Cluster."""
|
|
from app.clustering import _cluster_indices
|
|
items = _make_items(8, seed=99)
|
|
groups_low = _cluster_indices(items, 0.1)
|
|
groups_high = _cluster_indices(items, 0.99)
|
|
# Bei low threshold: mind. eine Gruppe > 1 möglich
|
|
# Bei high threshold (0.99): fast alle Singletons
|
|
singleton_low = sum(1 for g in groups_low if len(g) == 1)
|
|
singleton_high = sum(1 for g in groups_high if len(g) == 1)
|
|
assert singleton_high >= singleton_low
|
|
|
|
def test_sorted_by_size_descending(self):
|
|
from app.clustering import _cluster_indices
|
|
v = [1.0, 0.0]
|
|
items = [
|
|
{**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v}
|
|
for i in range(3)
|
|
] + [
|
|
{**_make_items(1)[0], "drucksache": "18/solo", "embedding": [0.0, 1.0]}
|
|
]
|
|
groups = _cluster_indices(items, 0.5)
|
|
sizes = [len(g) for g in groups]
|
|
assert sizes == sorted(sizes, reverse=True)
|
|
|
|
|
|
# ─── _dominant_fraktion ───────────────────────────────────────────────────────
|
|
|
|
class TestDominantFraktion:
|
|
def test_majority_fraktion_wins(self):
|
|
from app.clustering import _dominant_fraktion
|
|
items = [
|
|
{"fraktionen": ["SPD"]},
|
|
{"fraktionen": ["SPD"]},
|
|
{"fraktionen": ["CDU"]},
|
|
]
|
|
assert _dominant_fraktion(items) == "SPD"
|
|
|
|
def test_empty_items_returns_none(self):
|
|
from app.clustering import _dominant_fraktion
|
|
assert _dominant_fraktion([]) is None
|
|
|
|
def test_empty_fraktionen_lists_returns_none(self):
|
|
from app.clustering import _dominant_fraktion
|
|
items = [{"fraktionen": []}, {"fraktionen": None}]
|
|
assert _dominant_fraktion(items) is None
|
|
|
|
|
|
# ─── _cluster_label ───────────────────────────────────────────────────────────
|
|
|
|
class TestClusterLabel:
|
|
def test_top_theme_used_as_label(self):
|
|
from app.clustering import _cluster_label
|
|
items = [
|
|
{"themen": ["Klimaschutz", "Energie"], "title": "A"},
|
|
{"themen": ["Klimaschutz"], "title": "B"},
|
|
]
|
|
label = _cluster_label(items)
|
|
assert "Klimaschutz" in label
|
|
|
|
def test_fallback_to_shortest_title(self):
|
|
from app.clustering import _cluster_label
|
|
items = [
|
|
{"themen": [], "title": "Kurz"},
|
|
{"themen": [], "title": "Sehr langer Titel"},
|
|
]
|
|
label = _cluster_label(items)
|
|
assert label == "Kurz"
|
|
|
|
def test_fallback_cluster_label(self):
|
|
from app.clustering import _cluster_label
|
|
items = [{"themen": [], "title": None}]
|
|
label = _cluster_label(items)
|
|
assert label == "Cluster"
|
|
|
|
|
|
# ─── _cluster_summary ────────────────────────────────────────────────────────
|
|
|
|
class TestClusterSummary:
|
|
def test_basic_fields_present(self):
|
|
from app.clustering import _cluster_summary
|
|
items = _make_items(3)
|
|
summary = _cluster_summary(items)
|
|
for key in ("size", "label", "dominant_fraktion", "avg_gwoe_score", "drucksachen"):
|
|
assert key in summary
|
|
|
|
def test_size_correct(self):
|
|
from app.clustering import _cluster_summary
|
|
items = _make_items(4)
|
|
summary = _cluster_summary(items)
|
|
assert summary["size"] == 4
|
|
|
|
def test_avg_score_calculated(self):
|
|
from app.clustering import _cluster_summary
|
|
items = [
|
|
{**_make_items(1)[0], "gwoe_score": 4.0},
|
|
{**_make_items(1)[0], "gwoe_score": 6.0},
|
|
]
|
|
summary = _cluster_summary(items)
|
|
assert summary["avg_gwoe_score"] == 5.0
|
|
|
|
def test_include_edges_adds_nodes_and_edges(self):
|
|
from app.clustering import _cluster_summary
|
|
items = _make_items(3)
|
|
summary = _cluster_summary(items, include_edges=True)
|
|
assert "nodes" in summary
|
|
assert "edges" in summary
|
|
assert len(summary["nodes"]) == 3
|
|
# 3 Knoten → 3 Kanten (0-1, 0-2, 1-2)
|
|
assert len(summary["edges"]) == 3
|
|
|
|
def test_no_edges_without_flag(self):
|
|
from app.clustering import _cluster_summary
|
|
items = _make_items(3)
|
|
summary = _cluster_summary(items, include_edges=False)
|
|
assert "edges" not in summary
|
|
assert "nodes" not in summary
|
|
|
|
|
|
# ─── build_hierarchy (async, DB gemockt) ─────────────────────────────────────
|
|
|
|
class TestBuildHierarchy:
|
|
def test_empty_corpus_structure(self):
|
|
"""Leerer Corpus → korrekte Grundstruktur."""
|
|
from app import clustering
|
|
|
|
async def fake_load(bundesland=None):
|
|
return []
|
|
|
|
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
|
|
result = run(clustering.build_hierarchy())
|
|
|
|
assert result["meta"]["total"] == 0
|
|
assert result["clusters"] == []
|
|
assert result["singletons"] == []
|
|
|
|
def test_single_item_becomes_singleton(self):
|
|
from app import clustering
|
|
items = _make_items(1)
|
|
|
|
async def fake_load(bundesland=None):
|
|
return items
|
|
|
|
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
|
|
result = run(clustering.build_hierarchy(threshold=0.5))
|
|
|
|
assert len(result["singletons"]) == 1
|
|
assert result["clusters"] == []
|
|
|
|
def test_meta_fields_present(self):
|
|
from app import clustering
|
|
items = _make_items(4)
|
|
|
|
async def fake_load(bundesland=None):
|
|
return items
|
|
|
|
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
|
|
result = run(clustering.build_hierarchy())
|
|
|
|
meta = result["meta"]
|
|
for key in ("total", "threshold", "num_clusters", "num_singletons"):
|
|
assert key in meta
|
|
|
|
def test_threshold_affects_cluster_count(self):
|
|
"""Niedrigerer Threshold → mehr Kanten → potenziell mehr gebündelte Items."""
|
|
from app import clustering
|
|
# Identische Items → immer ein Cluster bei jedem Threshold < 1.0
|
|
v = [1.0, 0.0, 0.0]
|
|
items = [
|
|
{**_make_items(1)[0], "drucksache": f"18/{i}", "embedding": v}
|
|
for i in range(3)
|
|
]
|
|
|
|
async def fake_load(bundesland=None):
|
|
return items
|
|
|
|
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
|
|
result = run(clustering.build_hierarchy(threshold=0.5))
|
|
|
|
assert len(result["clusters"]) == 1
|
|
assert result["clusters"][0]["size"] == 3
|
|
|
|
|
|
# ─── find_similar_assessments (async, DB gemockt) ────────────────────────────
|
|
|
|
class TestFindSimilarAssessments:
|
|
def test_returns_empty_for_unknown_drucksache(self):
|
|
from app import clustering
|
|
items = _make_items(3)
|
|
|
|
async def fake_load(bundesland=None):
|
|
return items
|
|
|
|
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
|
|
result = run(clustering.find_similar_assessments("99/9999"))
|
|
|
|
assert result == []
|
|
|
|
def test_returns_top_k_results(self):
|
|
from app import clustering
|
|
items = _make_items(5)
|
|
target_id = items[0]["drucksache"]
|
|
|
|
async def fake_load(bundesland=None):
|
|
return items
|
|
|
|
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
|
|
result = run(clustering.find_similar_assessments(target_id, top_k=3))
|
|
|
|
assert len(result) == 3
|
|
|
|
def test_excludes_self(self):
|
|
from app import clustering
|
|
items = _make_items(5)
|
|
target_id = items[0]["drucksache"]
|
|
|
|
async def fake_load(bundesland=None):
|
|
return items
|
|
|
|
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
|
|
result = run(clustering.find_similar_assessments(target_id, top_k=10))
|
|
|
|
drucksachen = [r["drucksache"] for r in result]
|
|
assert target_id not in drucksachen
|
|
|
|
def test_result_sorted_by_similarity_descending(self):
|
|
from app import clustering
|
|
items = _make_items(5)
|
|
target_id = items[0]["drucksache"]
|
|
|
|
async def fake_load(bundesland=None):
|
|
return items
|
|
|
|
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
|
|
result = run(clustering.find_similar_assessments(target_id, top_k=4))
|
|
|
|
sims = [r["similarity"] for r in result]
|
|
assert sims == sorted(sims, reverse=True)
|
|
|
|
def test_result_fields_present(self):
|
|
from app import clustering
|
|
items = _make_items(3)
|
|
target_id = items[0]["drucksache"]
|
|
|
|
async def fake_load(bundesland=None):
|
|
return items
|
|
|
|
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
|
|
result = run(clustering.find_similar_assessments(target_id, top_k=2))
|
|
|
|
for r in result:
|
|
for key in ("drucksache", "title", "bundesland", "fraktionen",
|
|
"gwoe_score", "empfehlung", "similarity"):
|
|
assert key in r
|
|
|
|
def test_single_item_corpus_returns_empty(self):
|
|
"""Nur ein Item im Corpus → nach Selbst-Ausschluss kein Ergebnis."""
|
|
from app import clustering
|
|
items = _make_items(1)
|
|
|
|
async def fake_load(bundesland=None):
|
|
return items
|
|
|
|
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
|
|
result = run(clustering.find_similar_assessments(items[0]["drucksache"]))
|
|
|
|
assert result == []
|