- 16 aktive BL-Adapter + BUND (parlamente.py 3397 LOC) - drucksache_typen.py: BL-spezifische Typ-Normalisierung (#127) - mail.py: SMTP + Daily-Digest (#124) - clustering.py: Embedding-Naehe-Graph + Bubble-Chart (#105) - redline_utils.py: §INS§/§DEL§-Parser + PDF-Cite-URL-Builder - embeddings v3->v4 Migration (#123, ADR 0006) - chart.js + d3.v7 als statische Assets fuer Auswertungen-Cluster Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
313 lines
11 KiB
Python
313 lines
11 KiB
Python
"""Antrag-Clustering via Cosine-Similarity + Union-Find (#105).
|
|
|
|
Nutzt die v4-Embeddings aus assessments.summary_embedding (gefüllt durch #123)
|
|
und baut eine hierarchische Cluster-Struktur ohne externe Dependencies
|
|
(kein sklearn, kein numpy — für <500 Assessments ist pure Python ausreichend).
|
|
|
|
Algorithmus: Connected-Components via Union-Find über Kanten mit
|
|
Cosine-Similarity ≥ threshold. Level 0 = alle Anträge, Level 1 tighter Cluster.
|
|
Bei Clustern > 30 wird rekursiv mit höherem Threshold nachgeteilt.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import math
|
|
from collections import Counter
|
|
from typing import Optional
|
|
|
|
import aiosqlite
|
|
|
|
from .config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Cosine-Similarity-Thresholds
|
|
# Empirisch kalibriert an der Prod-DB (57 Assessments, 2026-04-11):
|
|
# 0.50 → 6 sinnvolle Cluster + 26 singletons (bester Default)
|
|
# 0.55 → 5 tighter Cluster
|
|
# 0.60 → 4 kleine Cluster, zu streng (die meisten themenähnlichen
|
|
# Anträge fallen raus)
|
|
# 0.70+ → fast alle singletons
|
|
# v4-Embeddings auf deutschen Parlamentsanträgen clustern bei ~0.50.
|
|
DEFAULT_THRESHOLD = 0.55
|
|
SUBCLUSTER_THRESHOLD = 0.70
|
|
MAX_CLUSTER_SIZE = 30 # darüber: sub-clustern
|
|
|
|
|
|
# ─── Math-Helpers ───────────────────────────────────────────────────────────
|
|
|
|
def _cosine(a: list[float], b: list[float]) -> float:
|
|
dot = sum(x * y for x, y in zip(a, b))
|
|
na = math.sqrt(sum(x * x for x in a))
|
|
nb = math.sqrt(sum(x * x for x in b))
|
|
if na == 0 or nb == 0:
|
|
return 0.0
|
|
return dot / (na * nb)
|
|
|
|
|
|
class UnionFind:
|
|
"""Klassisches Union-Find mit Path-Compression."""
|
|
|
|
def __init__(self, n: int):
|
|
self.parent = list(range(n))
|
|
self.rank = [0] * n
|
|
|
|
def find(self, x: int) -> int:
|
|
root = x
|
|
while self.parent[root] != root:
|
|
root = self.parent[root]
|
|
# Path-Compression
|
|
while self.parent[x] != root:
|
|
self.parent[x], x = root, self.parent[x]
|
|
return root
|
|
|
|
def union(self, a: int, b: int) -> None:
|
|
ra, rb = self.find(a), self.find(b)
|
|
if ra == rb:
|
|
return
|
|
if self.rank[ra] < self.rank[rb]:
|
|
self.parent[ra] = rb
|
|
elif self.rank[ra] > self.rank[rb]:
|
|
self.parent[rb] = ra
|
|
else:
|
|
self.parent[rb] = ra
|
|
self.rank[ra] += 1
|
|
|
|
|
|
# ─── DB-Lader ───────────────────────────────────────────────────────────────
|
|
|
|
async def load_assessment_items(
|
|
bundesland: Optional[str] = None,
|
|
) -> list[dict]:
|
|
"""Lädt alle Assessments mit gefülltem summary_embedding."""
|
|
sql = """
|
|
SELECT drucksache, title, fraktionen, datum, link, bundesland,
|
|
gwoe_score, empfehlung, empfehlung_symbol, themen,
|
|
summary_embedding
|
|
FROM assessments
|
|
WHERE summary_embedding IS NOT NULL
|
|
"""
|
|
params: list = []
|
|
if bundesland:
|
|
sql += " AND bundesland = ?"
|
|
params.append(bundesland)
|
|
|
|
items = []
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(sql, params) as cur:
|
|
async for row in cur:
|
|
try:
|
|
vec = json.loads(bytes(row["summary_embedding"]).decode())
|
|
except Exception:
|
|
logger.warning("bad embedding for %s", row["drucksache"])
|
|
continue
|
|
items.append({
|
|
"drucksache": row["drucksache"],
|
|
"title": row["title"],
|
|
"fraktionen": json.loads(row["fraktionen"] or "[]"),
|
|
"datum": row["datum"],
|
|
"link": row["link"],
|
|
"bundesland": row["bundesland"],
|
|
"gwoe_score": row["gwoe_score"],
|
|
"empfehlung": row["empfehlung"],
|
|
"empfehlung_symbol": row["empfehlung_symbol"],
|
|
"themen": json.loads(row["themen"] or "[]"),
|
|
"embedding": vec,
|
|
})
|
|
return items
|
|
|
|
|
|
# ─── Clustering ─────────────────────────────────────────────────────────────
|
|
|
|
def _cluster_indices(items: list[dict], threshold: float) -> list[list[int]]:
|
|
"""Union-Find-Clustering: Knoten = Items, Kante = cosine ≥ threshold."""
|
|
n = len(items)
|
|
uf = UnionFind(n)
|
|
for i in range(n):
|
|
for j in range(i + 1, n):
|
|
if _cosine(items[i]["embedding"], items[j]["embedding"]) >= threshold:
|
|
uf.union(i, j)
|
|
|
|
groups: dict[int, list[int]] = {}
|
|
for i in range(n):
|
|
root = uf.find(i)
|
|
groups.setdefault(root, []).append(i)
|
|
# Sortiere Cluster absteigend nach Größe
|
|
return sorted(groups.values(), key=len, reverse=True)
|
|
|
|
|
|
def _dominant_fraktion(items: list[dict]) -> Optional[str]:
|
|
counts: Counter = Counter()
|
|
for item in items:
|
|
for f in item.get("fraktionen") or []:
|
|
counts[f] += 1
|
|
if not counts:
|
|
return None
|
|
return counts.most_common(1)[0][0]
|
|
|
|
|
|
def _cluster_label(items: list[dict]) -> str:
|
|
"""Generiert ein Cluster-Label aus den häufigsten Themen der Mitglieder.
|
|
|
|
Nimmt die Top-2-3 Themen die in der Mehrheit der Cluster-Mitglieder
|
|
vorkommen und kombiniert sie zu einem prägnanten Label.
|
|
Fallback: kürzester Titel.
|
|
"""
|
|
# Themen-Häufigkeit über alle Cluster-Mitglieder
|
|
themen_counts: Counter = Counter()
|
|
for item in items:
|
|
for thema in item.get("themen") or []:
|
|
themen_counts[thema] += 1
|
|
|
|
if themen_counts:
|
|
# Top-Themen die in ≥50% der Mitglieder vorkommen, max 3
|
|
threshold = max(1, len(items) // 2)
|
|
top = [t for t, c in themen_counts.most_common(5) if c >= threshold][:3]
|
|
if top:
|
|
return " · ".join(top)
|
|
|
|
# Fallback: kürzester Titel
|
|
titles = [i["title"] for i in items if i.get("title")]
|
|
if titles:
|
|
return min(titles, key=len)
|
|
return "Cluster"
|
|
|
|
|
|
def _cluster_summary(cluster_items: list[dict], include_edges: bool = False) -> dict:
|
|
"""Zusammenfassung eines Clusters für die API-Antwort."""
|
|
scores = [i["gwoe_score"] for i in cluster_items if i.get("gwoe_score") is not None]
|
|
avg_score = round(sum(scores) / len(scores), 1) if scores else None
|
|
out = {
|
|
"size": len(cluster_items),
|
|
"label": _cluster_label(cluster_items),
|
|
"dominant_fraktion": _dominant_fraktion(cluster_items),
|
|
"avg_gwoe_score": avg_score,
|
|
"drucksachen": [i["drucksache"] for i in cluster_items],
|
|
}
|
|
if include_edges:
|
|
# Detail-Items pro Mitglied (für Force-Graph-Rendering)
|
|
out["nodes"] = [
|
|
{
|
|
"drucksache": i["drucksache"],
|
|
"title": i["title"],
|
|
"bundesland": i["bundesland"],
|
|
"fraktionen": i["fraktionen"],
|
|
"gwoe_score": i["gwoe_score"],
|
|
"empfehlung": i["empfehlung"],
|
|
}
|
|
for i in cluster_items
|
|
]
|
|
# Pairwise Cosine-Similarity als Kanten
|
|
edges = []
|
|
for a in range(len(cluster_items)):
|
|
for b in range(a + 1, len(cluster_items)):
|
|
sim = _cosine(cluster_items[a]["embedding"], cluster_items[b]["embedding"])
|
|
edges.append({"a": a, "b": b, "sim": round(sim, 3)})
|
|
out["edges"] = edges
|
|
return out
|
|
|
|
|
|
async def build_hierarchy(
|
|
bundesland: Optional[str] = None,
|
|
threshold: float = DEFAULT_THRESHOLD,
|
|
subcluster_threshold: float = SUBCLUSTER_THRESHOLD,
|
|
max_cluster_size: int = MAX_CLUSTER_SIZE,
|
|
) -> dict:
|
|
"""Lädt Assessments, clustert sie hierarchisch und gibt eine serialisierbare
|
|
Struktur zurück:
|
|
|
|
{
|
|
"meta": {"total": N, "threshold": 0.70, ...},
|
|
"clusters": [
|
|
{"size": 12, "label": ..., "dominant_fraktion": ...,
|
|
"drucksachen": [...], "subclusters": [ ... ] | None},
|
|
...
|
|
],
|
|
"singletons": [drucksache, drucksache, ...]
|
|
}
|
|
|
|
Bei Clustern größer als max_cluster_size wird rekursiv mit
|
|
subcluster_threshold ein zweiter Durchgang gestartet.
|
|
"""
|
|
items = await load_assessment_items(bundesland=bundesland)
|
|
if not items:
|
|
return {
|
|
"meta": {"total": 0, "threshold": threshold, "bundesland": bundesland},
|
|
"clusters": [],
|
|
"singletons": [],
|
|
}
|
|
|
|
top_groups = _cluster_indices(items, threshold)
|
|
|
|
clusters_out: list[dict] = []
|
|
singletons_out: list[str] = []
|
|
|
|
for group in top_groups:
|
|
if len(group) == 1:
|
|
singletons_out.append(items[group[0]]["drucksache"])
|
|
continue
|
|
|
|
cluster_items = [items[i] for i in group]
|
|
entry = _cluster_summary(cluster_items, include_edges=True)
|
|
|
|
# Sub-Clustern falls zu groß
|
|
if len(cluster_items) > max_cluster_size:
|
|
sub_groups = _cluster_indices(cluster_items, subcluster_threshold)
|
|
subs = []
|
|
for sg in sub_groups:
|
|
if len(sg) == 1:
|
|
continue
|
|
subs.append(_cluster_summary([cluster_items[i] for i in sg]))
|
|
entry["subclusters"] = subs
|
|
else:
|
|
entry["subclusters"] = None
|
|
|
|
clusters_out.append(entry)
|
|
|
|
return {
|
|
"meta": {
|
|
"total": len(items),
|
|
"threshold": threshold,
|
|
"subcluster_threshold": subcluster_threshold,
|
|
"max_cluster_size": max_cluster_size,
|
|
"bundesland": bundesland,
|
|
"num_clusters": len(clusters_out),
|
|
"num_singletons": len(singletons_out),
|
|
},
|
|
"clusters": clusters_out,
|
|
"singletons": singletons_out,
|
|
}
|
|
|
|
|
|
# ─── Ähnlichkeits-Suche für #108 Teil B ─────────────────────────────────────
|
|
|
|
async def find_similar_assessments(drucksache: str, top_k: int = 5) -> list[dict]:
|
|
"""Findet die top_k ähnlichsten Assessments zu einem gegebenen per
|
|
Cosine-Similarity über das Summary-Embedding."""
|
|
items = await load_assessment_items()
|
|
target = next((i for i in items if i["drucksache"] == drucksache), None)
|
|
if target is None:
|
|
return []
|
|
|
|
scored = []
|
|
for other in items:
|
|
if other["drucksache"] == drucksache:
|
|
continue
|
|
sim = _cosine(target["embedding"], other["embedding"])
|
|
scored.append((sim, other))
|
|
scored.sort(key=lambda t: t[0], reverse=True)
|
|
|
|
return [
|
|
{
|
|
"drucksache": other["drucksache"],
|
|
"title": other["title"],
|
|
"bundesland": other["bundesland"],
|
|
"fraktionen": other["fraktionen"],
|
|
"gwoe_score": other["gwoe_score"],
|
|
"empfehlung": other["empfehlung"],
|
|
"similarity": round(sim, 3),
|
|
}
|
|
for sim, other in scored[:top_k]
|
|
]
|