podcast-mindmap/scripts/detect_narrative_shift.py
Dotty Dotter 78d66bef21 #12 Wort-Highlighting Frontend, #14 Leerstellen-Detektor, #15 Narrative Shift,
#13/#16/#17/#18 Qwen-Analyse-Scripts

- Frontend: Wort-Level-Highlighting im Transkript — jedes Wort als <span> mit
  Timestamp, Karaoke-Style Sync bei Wiedergabe, CSS word-active/word-spoken
- API: /api/.../words Endpoint liefert Wort-Timestamps
- #14 detect_gaps.py: K-Means-Clustering über 3727 Embeddings, identifiziert
  Leerstellen (Themen die in einem Podcast fehlen). Ergebnis: gaps_analysis.json
- #15 detect_narrative_shift.py: Embedding-Drift pro Thema über Episodenfolge,
  erkennt Framing-Wechsel. Ergebnis: narrative_shifts.json
- #13 analyse_arguments.py: Qwen klassifiziert logische Relationen (erweitert,
  widerspricht, belegt, relativiert) zwischen semantisch ähnlichen Absätzen
- #16 extract_claims.py: Qwen extrahiert prüfbare Behauptungen (Zahlen, Statistiken)
- #17 extract_questions.py: Qwen extrahiert und klassifiziert Fragen
- #18 curate_debates.py: Qwen kuratiert Cross-Podcast-Gegenüberstellungen
- run_all_qwen.sh: Sequentielle Pipeline für alle Qwen-Tasks (vermeidet DB-Locks)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-23 22:29:41 +02:00

168 lines
5.9 KiB
Python

#!/usr/bin/env python3
"""#15 Narrative Shift Detection: Wie verschiebt sich das Framing über die Zeit?
Berechnet Embedding-Drift pro Themen-Cluster über die Episodenreihenfolge.
Spitzen = Framing-Wechsel.
Nutzung:
python3 detect_narrative_shift.py [db-pfad] [output-json]
"""
import json
import sys
import sqlite3
import numpy as np
DB_PATH = sys.argv[1] if len(sys.argv) > 1 else "data/db.sqlite"
OUTPUT = sys.argv[2] if len(sys.argv) > 2 else "data/narrative_shifts.json"
# Themen-Keywords für Cluster-Zuordnung
THEMES = {
"klimaschutz": ["klima", "klimawandel", "co2", "emission", "erderwärmung", "klimaschutz", "temperatur", "paris"],
"sicherheit": ["sicherheit", "verteidigung", "militär", "nato", "krieg", "frieden", "abschreckung", "bundeswehr"],
"demokratie": ["demokratie", "demokratisch", "wahl", "parlament", "abstimmung", "beteiligung", "grundgesetz"],
"ungleichheit": ["ungleichheit", "armut", "vermögen", "reichtum", "einkommen", "verteilung", "gini"],
"digitalisierung": ["digital", "plattform", "algorithmus", "google", "meta", "tiktok", "internet", "daten"],
"bildung": ["bildung", "schule", "universität", "lernen", "ausbildung", "studier", "lehre"],
"gesundheit": ["gesundheit", "krankheit", "allergie", "medizin", "prävention", "gesundheitssystem"],
"migration": ["migration", "flucht", "integration", "zuwanderung", "fachkräfte", "asyl"],
"wirtschaft": ["wirtschaft", "wachstum", "bip", "konjunktur", "inflation", "arbeitsmarkt", "produktivität"],
"freiheit": ["freiheit", "grundrecht", "diskriminierung", "gleichstellung", "meinungsfreiheit"],
}
def load_data(db_path):
db = sqlite3.connect(db_path)
db.row_factory = sqlite3.Row
rows = db.execute(
"SELECT p.podcast_id, p.episode_id, p.idx, p.text, p.embedding, e.staffel "
"FROM paragraphs p JOIN episodes e ON p.podcast_id = e.podcast_id AND p.episode_id = e.id "
"WHERE p.embedding IS NOT NULL "
"ORDER BY p.podcast_id, e.staffel, p.episode_id, p.idx"
).fetchall()
db.close()
return rows
def classify_theme(text):
"""Ordne einen Absatz einem Thema zu (Keyword-Match)."""
text_lower = text.lower()
scores = {}
for theme, keywords in THEMES.items():
score = sum(1 for kw in keywords if kw in text_lower)
if score > 0:
scores[theme] = score
if not scores:
return None
return max(scores, key=scores.get)
def cosine_distance(a, b):
na, nb = np.linalg.norm(a), np.linalg.norm(b)
if na == 0 or nb == 0:
return 1.0
return 1.0 - np.dot(a, b) / (na * nb)
def main():
print("Lade Daten…")
rows = load_data(DB_PATH)
print(f" {len(rows)} Absätze geladen.")
# Group by podcast → episode → theme
podcasts = {}
for r in rows:
pid = r["podcast_id"]
eid = r["episode_id"]
text = r["text"]
vec = np.frombuffer(r["embedding"], dtype=np.float32)
theme = classify_theme(text)
if theme is None:
continue
if pid not in podcasts:
podcasts[pid] = {}
if eid not in podcasts[pid]:
podcasts[pid][eid] = {"staffel": r["staffel"], "themes": {}}
if theme not in podcasts[pid][eid]["themes"]:
podcasts[pid][eid]["themes"][theme] = []
podcasts[pid][eid]["themes"][theme].append(vec)
# Compute centroid per (podcast, episode, theme)
shifts = {}
for pid, episodes in podcasts.items():
ep_list = sorted(episodes.keys()) # Lexicographic = chronological for SxEy format
for theme in THEMES:
centroids = []
ep_labels = []
for eid in ep_list:
if theme not in episodes[eid]["themes"]:
continue
vecs = np.array(episodes[eid]["themes"][theme])
centroid = vecs.mean(axis=0)
centroids.append(centroid)
ep_labels.append(eid)
if len(centroids) < 3:
continue
# Compute drift between consecutive episodes
drifts = []
for i in range(1, len(centroids)):
drift = cosine_distance(centroids[i - 1], centroids[i])
drifts.append({
"from": ep_labels[i - 1],
"to": ep_labels[i],
"drift": round(float(drift), 4),
})
# Find spikes (> 1.5 * median)
drift_vals = [d["drift"] for d in drifts]
median = float(np.median(drift_vals))
mean = float(np.mean(drift_vals))
spikes = [d for d in drifts if d["drift"] > median * 1.5]
key = f"{pid}/{theme}"
shifts[key] = {
"podcast": pid,
"theme": theme,
"episodes": ep_labels,
"n_episodes": len(ep_labels),
"mean_drift": round(mean, 4),
"median_drift": round(median, 4),
"max_drift": round(max(drift_vals), 4),
"drifts": drifts,
"spikes": spikes,
}
# Sort by max drift
sorted_shifts = sorted(shifts.values(), key=lambda x: -x["max_drift"])
result = {
"total_themes_tracked": len(sorted_shifts),
"themes": list(THEMES.keys()),
"shifts": sorted_shifts,
}
with open(OUTPUT, "w") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"\n{len(sorted_shifts)} Themen-Verläufe berechnet.")
print(f"\nGrößte Framing-Shifts:")
for s in sorted_shifts[:10]:
spike_info = ""
if s["spikes"]:
spike_info = " | Spikes: " + ", ".join(f"{sp['from']}{sp['to']}({sp['drift']:.3f})" for sp in s["spikes"][:3])
print(f" {s['podcast']}/{s['theme']:15s} — max_drift={s['max_drift']:.4f}, mean={s['mean_drift']:.4f}{spike_info}")
print(f"\nErgebnis: {OUTPUT}")
if __name__ == "__main__":
main()