podcast-mindmap/scripts/detect_gaps.py

185 lines
6.1 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""#14 Leerstellen-Detektor: Embedding-Cluster-Analyse zur Identifikation von Diskurslücken.
Bildet Cluster über alle Paragraphen, misst Dichte pro Podcast,
identifiziert asymmetrische und leere Cluster.
Nutzung:
python3 detect_gaps.py [db-pfad] [output-json]
"""
import json
import sys
import sqlite3
import numpy as np
DB_PATH = sys.argv[1] if len(sys.argv) > 1 else "data/db.sqlite"
OUTPUT = sys.argv[2] if len(sys.argv) > 2 else "data/gaps_analysis.json"
N_CLUSTERS = 30 # Feinere Auflösung
def load_embeddings(db_path):
db = sqlite3.connect(db_path)
db.row_factory = sqlite3.Row
rows = db.execute(
"SELECT p.id, p.podcast_id, p.episode_id, p.idx, p.text, p.embedding, e.title, e.guest "
"FROM paragraphs p JOIN episodes e ON p.podcast_id = e.podcast_id AND p.episode_id = e.id "
"WHERE p.embedding IS NOT NULL"
).fetchall()
db.close()
meta = []
vectors = []
for r in rows:
meta.append({
"id": r["id"], "podcast_id": r["podcast_id"],
"episode_id": r["episode_id"], "idx": r["idx"],
"text": r["text"][:200], "title": r["title"], "guest": r["guest"]
})
vectors.append(np.frombuffer(r["embedding"], dtype=np.float32))
return np.array(vectors), meta
def kmeans_simple(vectors, k, max_iter=50):
"""Simple K-Means ohne sklearn-Dependency."""
n = len(vectors)
# Init: random selection
rng = np.random.default_rng(42)
indices = rng.choice(n, k, replace=False)
centroids = vectors[indices].copy()
labels = np.zeros(n, dtype=int)
for _ in range(max_iter):
# Assign
dists = np.linalg.norm(vectors[:, None] - centroids[None, :], axis=2)
new_labels = np.argmin(dists, axis=1)
if np.all(new_labels == labels):
break
labels = new_labels
# Update centroids
for j in range(k):
mask = labels == j
if mask.sum() > 0:
centroids[j] = vectors[mask].mean(axis=0)
return labels, centroids
def main():
print("Lade Embeddings…")
vectors, meta = load_embeddings(DB_PATH)
print(f" {len(vectors)} Absätze geladen.")
# Normalize
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
norms[norms == 0] = 1
vectors_norm = vectors / norms
print(f"Clustere in {N_CLUSTERS} Gruppen…")
labels, centroids = kmeans_simple(vectors_norm, N_CLUSTERS)
# Analyze clusters
podcasts = sorted(set(m["podcast_id"] for m in meta))
clusters = []
for c in range(N_CLUSTERS):
mask = labels == c
indices = np.where(mask)[0]
members = [meta[i] for i in indices]
# Count per podcast
per_podcast = {p: 0 for p in podcasts}
for m in members:
per_podcast[m["podcast_id"]] += 1
# Representative texts (closest to centroid)
if len(indices) > 0:
dists = np.linalg.norm(vectors_norm[indices] - centroids[c], axis=1)
top_indices = indices[np.argsort(dists)[:5]]
representative = [{"text": meta[i]["text"], "episode": meta[i]["episode_id"],
"podcast": meta[i]["podcast_id"], "guest": meta[i]["guest"]} for i in top_indices]
else:
representative = []
# Derive topic label from representative texts
words = " ".join(m["text"][:100] for m in members[:20]).lower().split()
# Simple word frequency (exclude common words)
stop = {"der", "die", "das", "und", "in", "von", "zu", "den", "ist", "ein", "eine", "es", "mit",
"auf", "für", "an", "sich", "nicht", "auch", "dass", "wir", "man", "aber", "des", "dem",
"werden", "oder", "als", "wie", "hat", "ich", "sind", "was", "so", "haben", "dann",
"wenn", "noch", "schon", "kann", "wird", "hier", "über", "nach", "nur", "bei", "da",
"diese", "dieser", "dieses", "einem", "einer", "also", "ja", "mal", "war", "sehr",
"gibt", "aus", "zum", "zur", "mehr", "immer", "weil", "uns", "sie", "er", "vom"}
word_freq = {}
for w in words:
w = w.strip(".,;:!?\"'()[]")
if len(w) > 3 and w not in stop:
word_freq[w] = word_freq.get(w, 0) + 1
top_words = sorted(word_freq.items(), key=lambda x: -x[1])[:5]
label = ", ".join(w for w, _ in top_words) if top_words else f"Cluster {c}"
clusters.append({
"id": c,
"label": label,
"size": int(mask.sum()),
"per_podcast": per_podcast,
"representative": representative,
})
# Sort by size
clusters.sort(key=lambda x: -x["size"])
# Identify gaps
gaps = []
for cl in clusters:
total = cl["size"]
if total < 3:
continue
for p in podcasts:
count = cl["per_podcast"].get(p, 0)
other_total = total - count
if other_total > 10 and count <= 2:
gaps.append({
"cluster_label": cl["label"],
"cluster_size": total,
"missing_in": p,
"present_in_count": other_total,
"representative": cl["representative"][:3],
})
# Sort gaps by how asymmetric they are
gaps.sort(key=lambda x: -x["present_in_count"])
result = {
"total_paragraphs": len(meta),
"podcasts": podcasts,
"n_clusters": N_CLUSTERS,
"clusters": clusters,
"gaps": gaps[:30],
}
with open(OUTPUT, "w") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"\n{len(clusters)} Cluster, {len(gaps)} Leerstellen identifiziert.")
print(f"\nTop-Leerstellen:")
for g in gaps[:10]:
print(f" [{g['missing_in']}] fehlt: \"{g['cluster_label']}\" ({g['present_in_count']} Absätze im anderen Podcast)")
print(f"\nCluster-Größen:")
for cl in clusters[:15]:
bar = " | ".join(f"{p}:{cl['per_podcast'][p]}" for p in podcasts)
print(f" {cl['label'][:40]:40s} ({cl['size']:4d}) — {bar}")
print(f"\nErgebnis: {OUTPUT}")
if __name__ == "__main__":
main()