test(#134): clustering.py Coverage 82.3% → 99.3%

- TestUnionFindRankSwap: rank-Asymmetrie-Branch (Line 69)
- TestLoadAssessmentItems: tmp-DB mit korrekten + kaputten Embeddings,
  bundesland-Filter, vollstaendiges Item-Schema
- TestBuildHierarchySubclusters:
  - max_cluster_size=3 zwingt grossen Cluster zu sub-clustern
  - kleiner Cluster bekommt subclusters=None

Total Coverage: 49.9% → 50.4% (50%-Marke ueberschritten),
718 → 724 Tests.
This commit is contained in:
Dotty Dotter 2026-04-28 11:02:58 +02:00
parent 999926b5f3
commit 581d1591b8

View File

@ -436,3 +436,156 @@ class TestFindSimilarAssessments:
result = run(clustering.find_similar_assessments(items[0]["drucksache"]))
assert result == []
# ─── Coverage-Backfill (#134) ────────────────────────────────────────────────
class TestUnionFindRankSwap:
"""Wenn rank[ra] < rank[rb], muss parent[ra] auf rb zeigen (Line 69)."""
def test_smaller_rank_attaches_to_larger(self):
from app.clustering import UnionFind
uf = UnionFind(4)
# Erst zwei Trees mit unterschiedlichen Höhen aufbauen:
# 0—1 (rank 1) und 2—3—... (rank 1)
uf.union(0, 1)
uf.union(2, 3)
# Beide Roots haben rank 1 — uniteFurther durch Drittes:
uf.union(2, 0) # bringt einen rank-Tie auf rank=2 für einen
# Jetzt eine Insertion mit Rank-Asymmetrie:
# Erstellen wir eine Klasse mit höherem Rank
big = UnionFind(8)
big.union(0, 1)
big.union(2, 3)
big.union(0, 2) # baut einen rank-2-Baum
# Knoten 4 als Single (rank 0). union(4, 0) sollte 4 unter 0 hängen.
big.union(4, 0)
# 4 sollte jetzt im selben Set wie 0 sein
assert big.find(4) == big.find(0)
class TestLoadAssessmentItems:
"""Async DB-Lader; Tests gegen tmp-DB."""
def _build_db(self, tmp_path):
import sqlite3
import json as _j
db_path = tmp_path / "clust.db"
conn = sqlite3.connect(str(db_path))
conn.execute("""
CREATE TABLE assessments (
drucksache TEXT PRIMARY KEY, title TEXT,
fraktionen TEXT, datum TEXT, bundesland TEXT,
gwoe_score REAL, link TEXT,
empfehlung TEXT, empfehlung_symbol TEXT,
themen TEXT, summary_embedding BLOB
)
""")
# Korrektes Embedding
emb_ok = _j.dumps([0.1, 0.2, 0.3]).encode()
conn.execute(
"INSERT INTO assessments VALUES (?,?,?,?,?,?,?,?,?,?,?)",
("18/1", "T1", '["CDU"]', "2026-04-01", "NRW",
7.0, "x", "Empfohlen", "+", '["Klima"]', emb_ok),
)
# Kaputtes Embedding (ungueltiges JSON)
conn.execute(
"INSERT INTO assessments VALUES (?,?,?,?,?,?,?,?,?,?,?)",
("18/2", "T2", '["SPD"]', "2026-04-02", "NRW",
5.0, "y", "Empfohlen", "+", '["Klima"]', b"not-json"),
)
# Anderes BL (fuer bundesland-Filter)
conn.execute(
"INSERT INTO assessments VALUES (?,?,?,?,?,?,?,?,?,?,?)",
("8/1", "T3", '["AfD"]', "2026-04-03", "MV",
3.0, "z", "Ablehnen", "-", "[]", emb_ok),
)
conn.commit()
conn.close()
return db_path
def test_loads_only_valid_embeddings(self, tmp_path, monkeypatch):
from app.config import settings
from app import clustering
db = self._build_db(tmp_path)
monkeypatch.setattr(settings, "db_path", str(db))
items = run(clustering.load_assessment_items())
# 18/2 hat kaputtes Embedding und wird übersprungen
ids = sorted(i["drucksache"] for i in items)
assert "18/2" not in ids
assert "18/1" in ids
assert "8/1" in ids
def test_bundesland_filter(self, tmp_path, monkeypatch):
from app.config import settings
from app import clustering
db = self._build_db(tmp_path)
monkeypatch.setattr(settings, "db_path", str(db))
items = run(clustering.load_assessment_items(bundesland="NRW"))
ids = [i["drucksache"] for i in items]
assert ids == ["18/1"]
def test_loaded_item_fields_present(self, tmp_path, monkeypatch):
from app.config import settings
from app import clustering
db = self._build_db(tmp_path)
monkeypatch.setattr(settings, "db_path", str(db))
items = run(clustering.load_assessment_items(bundesland="NRW"))
assert items
item = items[0]
for key in ("drucksache", "title", "fraktionen", "datum", "link",
"bundesland", "gwoe_score", "empfehlung",
"empfehlung_symbol", "themen", "embedding"):
assert key in item
class TestBuildHierarchySubclusters:
"""Wenn ein Cluster groesser als max_cluster_size ist, wird sub-clustered
(Lines 256-262)."""
def test_large_cluster_gets_subclustered(self):
from app import clustering
from unittest.mock import patch
# 6 fast-identische Items → ein grosser Cluster, sub-Cluster sub > 1
v = [1.0, 0.0, 0.0]
items = [
{**_make_items(1)[0], "drucksache": f"18/{i}",
"embedding": [v[0] + 0.01 * i, v[1], v[2]]}
for i in range(6)
]
async def fake_load(bundesland=None):
return items
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
# max_cluster_size=3 zwingt sub-Clustering
result = run(clustering.build_hierarchy(
threshold=0.95, max_cluster_size=3, subcluster_threshold=0.999,
))
assert result["clusters"]
# Mindestens ein Cluster muss subclusters haben
assert any(c.get("subclusters") for c in result["clusters"])
def test_small_cluster_has_subclusters_none(self):
from app import clustering
from unittest.mock import patch
items = _make_items(2)
# Setze dieselben embeddings, damit sie in einem Cluster sind
items[0]["embedding"] = [1.0, 0.0, 0.0]
items[1]["embedding"] = [1.0, 0.0, 0.0]
async def fake_load(bundesland=None):
return items
with patch.object(clustering, "load_assessment_items", side_effect=fake_load):
result = run(clustering.build_hierarchy(
threshold=0.5, max_cluster_size=10,
))
for c in result["clusters"]:
assert c["subclusters"] is None