podcast-mindmap/backend/app.py
Dotty Dotter 7019a7a04e #20 Cross-Podcast-Mindmap mit Cross-Daten als Bruecken
Backend:
- /api/analyses/cross-network: Aggregat-Endpoint, der die Cross-Mindmap in einem
  Roundtrip versorgt — theme_clusters, themes, episodes, top_quotes (isTopQuote)
  und alle Cross-Link-Listen (debates, claim_belegt/-widerspricht/-erweitert,
  answers, similarity top-N pro source-episode + target-podcast).
- Filter aus den bestehenden Endpoints uebernommen: kein_bezug, error,
  Outro-Floskeln werden ignoriert.

Frontend (CrossMindmapView komplett umgebaut):
- Force-Graph mit vier Schichten: Cross-Theme-Cluster fix in der Mittelachse
  (gold, fett), Solo-Cluster lose, Themen je Podcast als zweite Schicht,
  Episoden ueber forceX in die Halbebene des Podcasts gezogen, Top-Quotes als
  Punkte am jeweiligen Episode-Knoten.
- Sechs Cross-Link-Typen mit eigenem Style:
  cross-debate (lila), claim-belegt (gruen), claim-widerspricht (rot),
  claim-erweitert (blau), answer (orange), similarity (hellblau gestrichelt,
  default aus).
- Toggle-Panel rechts oben (Vorlage: renderLinkToggles aus #19) je
  Verbindungstyp; Updates nur die opacity, kein Rebuild der Simulation.
- Klick auf Theme/Episode/Quote oeffnet den jeweiligen Single-Podcast-Modus
  und navigiert weiter (showEpisode + playFrom).
- Klick auf einen Cross-Cluster filtert die Mindmap auf seine Mitglieder
  (filterCluster) — Themen, Episoden und ihre Cross-Linien werden hervorgehoben,
  Rest gedaempft.
- Panel rechts: Counter je Cross-Typ als farbige Chips, Cross-Theme-Cluster-
  Karten und die Top-Debatten als Direkt-Einstieg in DebatesView.

Routing:
- Neue Route /cross laedt die Cross-Mindmap direkt; loadApp und der
  popstate-Handler unterstuetzen sie analog zu /ldn und /neu-denken.

Datenlage live: 246 Knoten, 1422 Linien (97 Debatten, 199 belegt, 58
widerspricht, 282 erweitert, 236 Antworten, 305 similarity).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 20:52:04 +02:00

1002 lines
38 KiB
Python

"""FastAPI backend for podcast-mindmap."""
import json
import os
import numpy as np
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, Query, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from database import get_db, init_db, get_all_embeddings
app = FastAPI(title="Podcast Mindmap API")
DATA_DIR = os.environ.get("DATA_DIR", "/data")
AUDIO_DIR = os.environ.get("AUDIO_DIR", "/audio")
STATIC_DIR = os.environ.get("STATIC_DIR", "/static")
# Cache embeddings in memory
_embeddings_cache = {}
def _load_embeddings(podcast_id: Optional[str] = None):
"""Load and cache embeddings."""
key = podcast_id or "__all__"
if key not in _embeddings_cache:
vectors, meta = get_all_embeddings(podcast_id)
_embeddings_cache[key] = (vectors, meta)
return _embeddings_cache[key]
def _invalidate_cache():
_embeddings_cache.clear()
# ── API Routes ──
@app.get("/api/podcasts")
def list_podcasts():
db = get_db()
rows = db.execute("SELECT * FROM podcasts").fetchall()
db.close()
return [dict(r) for r in rows]
@app.get("/api/podcasts/{podcast_id}")
def get_podcast(podcast_id: str):
db = get_db()
podcast = db.execute("SELECT * FROM podcasts WHERE id = ?", (podcast_id,)).fetchone()
if not podcast:
raise HTTPException(404, "Podcast not found")
staffeln = db.execute("SELECT * FROM staffeln WHERE podcast_id = ? ORDER BY id", (podcast_id,)).fetchall()
themes = db.execute("SELECT * FROM themes WHERE podcast_id = ?", (podcast_id,)).fetchall()
episodes = db.execute("SELECT * FROM episodes WHERE podcast_id = ? ORDER BY id", (podcast_id,)).fetchall()
quotes = db.execute("SELECT * FROM quotes WHERE podcast_id = ?", (podcast_id,)).fetchall()
db.close()
# Build mindmap_data compatible format
return {
"name": podcast["name"],
"host": podcast["host"],
"description": podcast["description"],
"staffeln": [dict(s) for s in staffeln],
"themes": [{**dict(t), "episodes": json.loads(t["episodes_json"])} for t in themes],
"episodes": [{"id": e["id"], "title": e["title"], "guest": e["guest"],
"staffel": e["staffel"], "audioFile": e["audio_file"]} for e in episodes],
"quotes": [{
"id": q["id"], "text": q["text"], "verbatim": q["verbatim"],
"speaker": q["speaker"], "episode": q["episode_id"],
"startTime": q["start_time"], "endTime": q["end_time"],
"isTopQuote": bool(q["is_top_quote"]),
"themes": json.loads(q["themes_json"]),
"audioFile": next((e["audio_file"] for e in episodes if e["id"] == q["episode_id"]), None)
} for q in quotes],
}
@app.get("/api/podcasts/{podcast_id}/network")
def get_podcast_network(podcast_id: str, top_per_episode: int = 2, min_score: float = 0.65):
"""Mindmap-Querverbindungen (#19): Top-N Episode-Episode-Links per Aehnlichkeit
sowie Episode-Pair-Aggregate aus argument_links."""
db = get_db()
# Episode-Episode similarity: pro source-Episode die staerksten N Targets
sim_rows = db.execute(
"SELECT source_episode, target_podcast, target_episode, MAX(score) AS top_score, COUNT(*) AS n_links "
"FROM semantic_links "
"WHERE podcast_id = ? AND source_episode != target_episode "
"GROUP BY source_episode, target_podcast, target_episode "
"HAVING top_score >= ? "
"ORDER BY source_episode, top_score DESC",
(podcast_id, min_score),
).fetchall()
# In Python auf top_per_episode pro Source einschraenken
per_src = {}
sim_links = []
for r in sim_rows:
src = r["source_episode"]
if per_src.get(src, 0) >= top_per_episode:
continue
sim_links.append({
"source_episode": src,
"target_podcast": r["target_podcast"],
"target_episode": r["target_episode"],
"score": r["top_score"],
"n_paragraph_links": r["n_links"],
})
per_src[src] = per_src.get(src, 0) + 1
# Argument-Links: aggregiert pro Episode-Pair und Relation
arg_links = []
if _table_exists(db, "argument_links"):
arg_rows = db.execute(
"SELECT source_episode, target_podcast, target_episode, relation, COUNT(*) AS n "
"FROM argument_links "
"WHERE source_podcast = ? AND relation NOT IN ('error', 'kein_bezug') "
"GROUP BY source_episode, target_podcast, target_episode, relation "
"ORDER BY n DESC",
(podcast_id,),
).fetchall()
# Aggregate per Episode-Pair
pair_map = {}
for r in arg_rows:
key = (r["source_episode"], r["target_podcast"], r["target_episode"])
pair_map.setdefault(key, {})[r["relation"]] = r["n"]
for (src, tp, te), counts in pair_map.items():
arg_links.append({
"source_episode": src,
"target_podcast": tp,
"target_episode": te,
"counts": counts,
})
db.close()
return {
"podcast_id": podcast_id,
"top_per_episode": top_per_episode,
"min_score": min_score,
"similarity_links": sim_links,
"argument_links": arg_links,
}
@app.get("/api/podcasts/{podcast_id}/transcript/{episode_id}")
def get_transcript(podcast_id: str, episode_id: str):
db = get_db()
paras = db.execute(
"SELECT idx, start_time, end_time, text FROM paragraphs WHERE podcast_id = ? AND episode_id = ? ORDER BY idx",
(podcast_id, episode_id)
).fetchall()
db.close()
return {"paragraphs": [{"start": p["start_time"], "end": p["end_time"], "text": p["text"]} for p in paras]}
@app.get("/api/podcasts/{podcast_id}/transcript/{episode_id}/words")
def get_words(podcast_id: str, episode_id: str):
"""Get word-level timestamps for an episode."""
db = get_db()
# Check if words table exists
try:
words = db.execute(
"SELECT segment_idx, word_idx, word, start_time, end_time FROM words "
"WHERE podcast_id = ? AND episode_id = ? ORDER BY segment_idx, word_idx",
(podcast_id, episode_id)
).fetchall()
except Exception:
db.close()
return {"words": [], "available": False}
db.close()
if not words:
return {"words": [], "available": False}
return {
"available": True,
"words": [{"seg": w["segment_idx"], "idx": w["word_idx"],
"word": w["word"], "start": w["start_time"], "end": w["end_time"]} for w in words]
}
def _table_exists(db, name: str) -> bool:
return db.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", (name,)
).fetchone() is not None
@app.get("/api/podcasts/{podcast_id}/episodes/{episode_id}/claims")
def get_episode_claims(podcast_id: str, episode_id: str, claim_type: Optional[str] = None):
"""Claims (Behauptungen) für eine Episode, mit Match-Anzahlen je Relation (#16 Stufe 2)."""
db = get_db()
if not _table_exists(db, "claims"):
db.close()
return {"available": False, "claims": []}
sql = ("SELECT id, paragraph_idx, claim_text, claim_type, verifiable, start_time "
"FROM claims WHERE podcast_id = ? AND episode_id = ?")
params = [podcast_id, episode_id]
if claim_type:
sql += " AND claim_type = ?"
params.append(claim_type)
sql += " ORDER BY paragraph_idx, id"
rows = db.execute(sql, params).fetchall()
claims_list = [dict(r) for r in rows]
# Match-Counts und besten Match je claim_id anhaengen, falls Tabelle existiert.
# kein_bezug wird gefiltert (dient nur als Verarbeitungs-Marker fuer das Skript).
if claims_list and _table_exists(db, "claim_matches"):
ids = [c["id"] for c in claims_list]
placeholder = ",".join("?" * len(ids))
match_rows = db.execute(
f"SELECT claim_id, relation, COUNT(*) c FROM claim_matches "
f"WHERE claim_id IN ({placeholder}) AND relation != 'kein_bezug' "
f"GROUP BY claim_id, relation",
ids,
).fetchall()
counts = {}
for r in match_rows:
counts.setdefault(r["claim_id"], {})[r["relation"]] = r["c"]
# bester Match je claim (fuer Quick-Link), kein_bezug ausblenden
best_rows = db.execute(
f"SELECT cm.claim_id, cm.relation, cm.target_podcast, cm.target_episode, "
f"cm.target_idx, cm.reason, cm.score "
f"FROM claim_matches cm "
f"WHERE cm.claim_id IN ({placeholder}) AND cm.relation != 'kein_bezug' "
f"AND cm.id IN (SELECT MIN(id) FROM claim_matches WHERE claim_id IN ({placeholder}) "
f"AND relation != 'kein_bezug' GROUP BY claim_id) ",
ids + ids,
).fetchall()
best = {r["claim_id"]: dict(r) for r in best_rows}
for c in claims_list:
c["match_counts"] = counts.get(c["id"], {})
c["best_match"] = best.get(c["id"])
db.close()
return {"available": True, "claims": claims_list}
@app.get("/api/podcasts/{podcast_id}/episodes/{episode_id}/questions")
def get_episode_questions(podcast_id: str, episode_id: str, question_type: Optional[str] = None,
answered: Optional[str] = None):
"""Fragen einer Episode."""
db = get_db()
if not _table_exists(db, "questions"):
db.close()
return {"available": False, "questions": []}
sql = ("SELECT id, paragraph_idx, question_text, question_type, answered, "
"answered_by_podcast, answered_by_episode, answered_by_idx, start_time "
"FROM questions WHERE podcast_id = ? AND episode_id = ?")
params = [podcast_id, episode_id]
if question_type:
sql += " AND question_type = ?"
params.append(question_type)
if answered:
sql += " AND answered = ?"
params.append(answered)
sql += " ORDER BY paragraph_idx, id"
rows = db.execute(sql, params).fetchall()
db.close()
return {"available": True, "questions": [dict(r) for r in rows]}
@app.get("/api/podcasts/{podcast_id}/episodes/{episode_id}/analyses-summary")
def get_episode_analyses_summary(podcast_id: str, episode_id: str):
"""Zähler für die Analyse-Datentöpfe einer Episode (für UI-Buttons)."""
db = get_db()
out = {}
if _table_exists(db, "claims"):
out["claims"] = db.execute(
"SELECT COUNT(*) FROM claims WHERE podcast_id = ? AND episode_id = ?",
(podcast_id, episode_id)
).fetchone()[0]
if _table_exists(db, "questions"):
out["questions"] = db.execute(
"SELECT COUNT(*) FROM questions WHERE podcast_id = ? AND episode_id = ?",
(podcast_id, episode_id)
).fetchone()[0]
out["questions_unanswered"] = db.execute(
"SELECT COUNT(*) FROM questions WHERE podcast_id = ? AND episode_id = ? AND answered = 'no'",
(podcast_id, episode_id)
).fetchone()[0]
db.close()
return out
@app.get("/api/analyses/gaps")
def get_gaps_analysis(min_size: int = 0, missing_in: Optional[str] = None, limit: int = 200):
"""Leerstellen-Analyse (#14): Cluster, die in mindestens einem Podcast fehlen."""
path = Path(DATA_DIR) / "gaps_analysis.json"
if not path.exists():
return {"available": False}
try:
with open(path) as f:
data = json.load(f)
except Exception:
return {"available": False}
gaps = data.get("gaps", [])
if min_size > 0:
gaps = [g for g in gaps if g.get("cluster_size", 0) >= min_size]
if missing_in:
gaps = [g for g in gaps if g.get("missing_in") == missing_in]
gaps = gaps[:limit]
return {
"available": True,
"total_paragraphs": data.get("total_paragraphs"),
"podcasts": data.get("podcasts", []),
"n_clusters": data.get("n_clusters"),
"clusters": data.get("clusters", []),
"gaps": gaps,
}
@app.get("/api/analyses/shifts")
def get_shifts_analysis(podcast: Optional[str] = None, theme: Optional[str] = None,
min_drift: float = 0.0, limit: int = 200):
"""Narrative-Shift-Analyse (#15): Drift zwischen aufeinanderfolgenden Episoden je Theme."""
path = Path(DATA_DIR) / "narrative_shifts.json"
if not path.exists():
return {"available": False}
try:
with open(path) as f:
data = json.load(f)
except Exception:
return {"available": False}
shifts = data.get("shifts", [])
if podcast:
shifts = [s for s in shifts if s.get("podcast") == podcast]
if theme:
shifts = [s for s in shifts if s.get("theme") == theme]
if min_drift > 0:
shifts = [s for s in shifts if s.get("max_drift", 0) >= min_drift]
shifts = shifts[:limit]
podcasts = sorted({s.get("podcast") for s in data.get("shifts", []) if s.get("podcast")})
return {
"available": True,
"total_themes_tracked": data.get("total_themes_tracked"),
"themes": data.get("themes", []),
"podcasts": podcasts,
"shifts": shifts,
}
@app.get("/api/analyses/cross-network")
def get_cross_network(top_sim: int = 3, min_score: float = 0.65):
"""Cross-Podcast-Mindmap-Aggregat (#20): alles, was die Cross-Mindmap braucht,
in einem Roundtrip — Theme-Cluster, Themen, Episoden, Top-Quotes und
Cross-Link-Listen (debates, claim-Pairs, answer-Pairs, similarity-Pairs)."""
db = get_db()
# Theme-Cluster aus theme_clusters.json
theme_clusters = []
cluster_path = Path(DATA_DIR) / "theme_clusters.json"
if cluster_path.exists():
try:
with open(cluster_path) as f:
tc = json.load(f)
theme_clusters = tc.get("clusters", [])
except Exception:
pass
podcasts = [dict(r) for r in db.execute("SELECT id, name FROM podcasts").fetchall()]
pids = [p["id"] for p in podcasts]
# Themen mit zugeordneten Episode-IDs
theme_rows = db.execute(
"SELECT podcast_id, id, label, description, color, episodes_json FROM themes"
).fetchall()
themes = []
for r in theme_rows:
try:
eps = json.loads(r["episodes_json"] or "[]")
except Exception:
eps = []
themes.append({
"podcast_id": r["podcast_id"],
"id": r["id"],
"label": r["label"],
"description": r["description"],
"color": r["color"],
"episode_ids": eps,
})
# Episoden + Staffel-Farbe
ep_rows = db.execute(
"SELECT e.podcast_id, e.id, e.title, e.guest, e.staffel, s.color "
"FROM episodes e LEFT JOIN staffeln s "
"ON e.podcast_id = s.podcast_id AND e.staffel = s.id "
"ORDER BY e.podcast_id, e.id"
).fetchall()
episodes = [dict(r) for r in ep_rows]
# Top-Quotes (isTopQuote)
q_rows = db.execute(
"SELECT id, podcast_id, episode_id, text, speaker, themes_json, start_time "
"FROM quotes WHERE is_top_quote = 1 ORDER BY podcast_id, episode_id"
).fetchall()
top_quotes = []
for q in q_rows:
try:
qt = json.loads(q["themes_json"] or "[]")
except Exception:
qt = []
top_quotes.append({
"id": q["id"], "podcast_id": q["podcast_id"], "episode_id": q["episode_id"],
"text": q["text"], "speaker": q["speaker"], "themes": qt,
"start_time": q["start_time"],
})
cross_links = {
"similarity": [],
"debates": [],
"claim_belegt": [],
"claim_widerspricht": [],
"claim_erweitert": [],
"answers": [],
}
# Similarity: top_sim pairs je (source_podcast, source_episode, target_podcast)
if len(pids) > 1:
sim_rows = db.execute(
"SELECT podcast_id, source_episode, target_podcast, target_episode, MAX(score) AS score "
"FROM semantic_links "
"WHERE podcast_id != target_podcast AND score >= ? "
"GROUP BY podcast_id, source_episode, target_podcast, target_episode "
"ORDER BY podcast_id, source_episode, score DESC",
(min_score,),
).fetchall()
per_src = {}
for r in sim_rows:
key = (r["podcast_id"], r["source_episode"], r["target_podcast"])
n = per_src.get(key, 0)
if n >= top_sim:
continue
cross_links["similarity"].append({
"a": f"{r['podcast_id']}:{r['source_episode']}",
"b": f"{r['target_podcast']}:{r['target_episode']}",
"score": float(r["score"]),
})
per_src[key] = n + 1
# Debates: jede zaehlt
if _table_exists(db, "debates"):
deb_rows = db.execute(
"SELECT id, source_podcast, source_episode, target_podcast, target_episode, topic "
"FROM debates WHERE topic IS NOT NULL AND topic != '' AND topic != 'error'"
).fetchall()
for r in deb_rows:
cross_links["debates"].append({
"a": f"{r['source_podcast']}:{r['source_episode']}",
"b": f"{r['target_podcast']}:{r['target_episode']}",
"topic": r["topic"], "debate_id": r["id"],
})
# Claim-Matches cross-podcast, je Episode-Pair die dominante Relation
if _table_exists(db, "claim_matches"):
cm_rows = db.execute(
"SELECT cl.podcast_id AS src_pid, cl.episode_id AS src_ep, "
"cm.target_podcast AS tgt_pid, cm.target_episode AS tgt_ep, "
"cm.relation, COUNT(*) AS n "
"FROM claim_matches cm JOIN claims cl ON cm.claim_id = cl.id "
"WHERE cm.target_podcast != cl.podcast_id AND cm.relation != 'kein_bezug' "
"GROUP BY cl.podcast_id, cl.episode_id, cm.target_podcast, cm.target_episode, cm.relation"
).fetchall()
# Aggregiere pro Pair: relation->count
pair_map = {}
for r in cm_rows:
key = (r["src_pid"], r["src_ep"], r["tgt_pid"], r["tgt_ep"])
pair_map.setdefault(key, {})[r["relation"]] = r["n"]
for (sp, se, tp, te), counts in pair_map.items():
for rel in ("belegt", "widerspricht", "erweitert"):
if counts.get(rel):
bucket = f"claim_{rel}"
cross_links[bucket].append({
"a": f"{sp}:{se}", "b": f"{tp}:{te}", "n": counts[rel],
})
# Question-Answer-Pairs cross-podcast
if _table_exists(db, "questions"):
ans_rows = db.execute(
"SELECT podcast_id AS src_pid, episode_id AS src_ep, "
"answered_by_podcast AS tgt_pid, answered_by_episode AS tgt_ep, COUNT(*) AS n "
"FROM questions "
"WHERE answered_by_podcast IS NOT NULL AND answered_by_episode IS NOT NULL "
"AND podcast_id != answered_by_podcast "
"GROUP BY podcast_id, episode_id, answered_by_podcast, answered_by_episode"
).fetchall()
for r in ans_rows:
cross_links["answers"].append({
"a": f"{r['src_pid']}:{r['src_ep']}",
"b": f"{r['tgt_pid']}:{r['tgt_ep']}",
"n": r["n"],
})
db.close()
return {
"podcasts": podcasts,
"theme_clusters": theme_clusters,
"themes": themes,
"episodes": episodes,
"top_quotes": top_quotes,
"cross_links": cross_links,
}
@app.get("/api/analyses/cross-themes")
def get_cross_themes():
"""Cross-Podcast-Themen-Cluster (#8/#10): Themen aus verschiedenen Podcasts,
die semantisch zusammengehoeren."""
path = Path(DATA_DIR) / "theme_clusters.json"
if not path.exists():
return {"available": False}
try:
with open(path) as f:
data = json.load(f)
except Exception:
return {"available": False}
return {"available": True, **data}
@app.get("/api/analyses/density")
def get_density(podcast_id: Optional[str] = None, bins: int = 20):
"""Faktendichte (#16): claims-Verteilung je Episode in N Bins ueber die Paragraph-Achse."""
db = get_db()
if not _table_exists(db, "claims"):
db.close()
return {"available": False, "episodes": []}
where = "WHERE 1=1"
params = []
if podcast_id:
where += " AND e.podcast_id = ?"
params.append(podcast_id)
rows = db.execute(
f"""
SELECT e.podcast_id, e.id AS episode_id, e.title, e.guest, e.staffel,
(SELECT MAX(p.idx) FROM paragraphs p
WHERE p.podcast_id = e.podcast_id AND p.episode_id = e.id) AS max_para,
(SELECT COUNT(*) FROM paragraphs p
WHERE p.podcast_id = e.podcast_id AND p.episode_id = e.id) AS n_para
FROM episodes e
{where}
ORDER BY e.podcast_id, e.id
""",
params,
).fetchall()
out = []
for r in rows:
n_para = r["n_para"] or 0
max_para = r["max_para"] or 0
if n_para == 0:
continue
claim_rows = db.execute(
"SELECT paragraph_idx, claim_type, verifiable FROM claims "
"WHERE podcast_id = ? AND episode_id = ?",
(r["podcast_id"], r["episode_id"]),
).fetchall()
total_claims = len(claim_rows)
verifiable = sum(1 for c in claim_rows if c["verifiable"])
bin_counts = [0] * bins
bin_verifiable = [0] * bins
denom = max(max_para, 1)
for c in claim_rows:
idx = c["paragraph_idx"] or 0
b = min(bins - 1, int(idx * bins / (denom + 1)))
bin_counts[b] += 1
if c["verifiable"]:
bin_verifiable[b] += 1
out.append({
"podcast_id": r["podcast_id"],
"episode_id": r["episode_id"],
"title": r["title"],
"guest": r["guest"],
"staffel": r["staffel"],
"n_paragraphs": n_para,
"total_claims": total_claims,
"verifiable_claims": verifiable,
"density_bins": bin_counts,
"verifiable_bins": bin_verifiable,
"claims_per_para": (total_claims / n_para) if n_para else 0.0,
})
db.close()
return {"available": True, "bins": bins, "episodes": out}
@app.get("/api/analyses/debates")
def get_debates(topic: Optional[str] = None, source_podcast: Optional[str] = None,
target_podcast: Optional[str] = None, limit: int = 200):
"""Cross-Podcast-Debatten (#18): kuratierte Gegenueberstellungen je Thema."""
db = get_db()
if not _table_exists(db, "debates"):
db.close()
return {"available": False, "debates": []}
sql = (
"SELECT d.id, d.topic, d.agreement, d.divergence, d.insight, d.score, "
"d.source_podcast, d.source_episode, d.source_idx, "
"d.target_podcast, d.target_episode, d.target_idx, "
"p1.text AS source_text, p1.start_time AS source_start, "
"p2.text AS target_text, p2.start_time AS target_start, "
"pc1.name AS source_pname, pc2.name AS target_pname, "
"e1.title AS source_title, e1.guest AS source_guest, "
"e2.title AS target_title, e2.guest AS target_guest "
"FROM debates d "
"JOIN paragraphs p1 ON d.source_podcast = p1.podcast_id AND d.source_episode = p1.episode_id AND d.source_idx = p1.idx "
"JOIN paragraphs p2 ON d.target_podcast = p2.podcast_id AND d.target_episode = p2.episode_id AND d.target_idx = p2.idx "
"JOIN podcasts pc1 ON d.source_podcast = pc1.id "
"JOIN podcasts pc2 ON d.target_podcast = pc2.id "
"JOIN episodes e1 ON d.source_podcast = e1.podcast_id AND d.source_episode = e1.id "
"JOIN episodes e2 ON d.target_podcast = e2.podcast_id AND d.target_episode = e2.id "
"WHERE d.topic IS NOT NULL AND d.topic != 'error' AND d.topic != ''"
)
params = []
if topic:
sql += " AND d.topic LIKE ?"
params.append(f"%{topic}%")
if source_podcast:
sql += " AND d.source_podcast = ?"
params.append(source_podcast)
if target_podcast:
sql += " AND d.target_podcast = ?"
params.append(target_podcast)
sql += " ORDER BY d.score DESC LIMIT ?"
params.append(limit)
rows = db.execute(sql, params).fetchall()
topics = db.execute(
"SELECT topic, COUNT(*) c FROM debates "
"WHERE topic IS NOT NULL AND topic != 'error' AND topic != '' "
"GROUP BY topic ORDER BY c DESC LIMIT 30"
).fetchall()
podcasts = db.execute("SELECT id, name FROM podcasts").fetchall()
db.close()
return {
"available": True,
"podcasts": [dict(p) for p in podcasts],
"topics": [{"topic": t["topic"], "count": t["c"]} for t in topics],
"debates": [dict(r) for r in rows],
}
@app.get("/api/analyses/arguments")
def get_arguments(relation: Optional[str] = None, podcast_id: Optional[str] = None,
episode_id: Optional[str] = None, source_podcast: Optional[str] = None,
target_podcast: Optional[str] = None, limit: int = 200):
"""Argumentketten (#13): klassifizierte Relationen zwischen Absatz-Paaren."""
db = get_db()
if not _table_exists(db, "argument_links"):
db.close()
return {"available": False, "links": []}
sql = (
"SELECT a.id, a.relation, a.confidence, a.explanation, a.score, "
"a.source_podcast, a.source_episode, a.source_idx, "
"a.target_podcast, a.target_episode, a.target_idx, "
"p1.text AS source_text, p1.start_time AS source_start, "
"p2.text AS target_text, p2.start_time AS target_start, "
"e1.title AS source_title, e1.guest AS source_guest, "
"e2.title AS target_title, e2.guest AS target_guest "
"FROM argument_links a "
"JOIN paragraphs p1 ON a.source_podcast = p1.podcast_id AND a.source_episode = p1.episode_id AND a.source_idx = p1.idx "
"JOIN paragraphs p2 ON a.target_podcast = p2.podcast_id AND a.target_episode = p2.episode_id AND a.target_idx = p2.idx "
"JOIN episodes e1 ON a.source_podcast = e1.podcast_id AND a.source_episode = e1.id "
"JOIN episodes e2 ON a.target_podcast = e2.podcast_id AND a.target_episode = e2.id "
"WHERE a.relation IS NOT NULL AND a.relation != 'error' "
# Wortwoertlich identische Paare als Sicherheitsnetz fuer kuenftige Re-Runs nicht zeigen.
"AND NOT (a.relation = 'gleicher_punkt' AND p1.text = p2.text)"
)
params = []
if relation:
sql += " AND a.relation = ?"
params.append(relation)
if podcast_id:
sql += " AND (a.source_podcast = ? OR a.target_podcast = ?)"
params.extend([podcast_id, podcast_id])
if episode_id:
sql += " AND (a.source_episode = ? OR a.target_episode = ?)"
params.extend([episode_id, episode_id])
if source_podcast:
sql += " AND a.source_podcast = ?"
params.append(source_podcast)
if target_podcast:
sql += " AND a.target_podcast = ?"
params.append(target_podcast)
sql += " ORDER BY a.confidence DESC, a.score DESC LIMIT ?"
params.append(limit)
rows = db.execute(sql, params).fetchall()
relations = db.execute(
"SELECT relation, COUNT(*) c FROM argument_links "
"WHERE relation IS NOT NULL AND relation != 'error' "
"GROUP BY relation ORDER BY c DESC"
).fetchall()
podcasts = db.execute("SELECT id, name FROM podcasts").fetchall()
db.close()
return {
"available": True,
"podcasts": [dict(p) for p in podcasts],
"relations": [{"relation": r["relation"], "count": r["c"]} for r in relations],
"links": [dict(r) for r in rows],
}
@app.get("/api/search")
def search(q: str = Query(..., min_length=2), podcast_id: Optional[str] = None, limit: int = 50):
"""Full-text search across all transcripts."""
db = get_db()
q_like = f"%{q}%"
if podcast_id:
rows = db.execute(
"SELECT p.podcast_id, p.episode_id, p.idx, p.start_time, p.text, e.title, e.guest "
"FROM paragraphs p JOIN episodes e ON p.podcast_id = e.podcast_id AND p.episode_id = e.id "
"WHERE p.podcast_id = ? AND p.text LIKE ? LIMIT ?",
(podcast_id, q_like, limit)
).fetchall()
else:
rows = db.execute(
"SELECT p.podcast_id, p.episode_id, p.idx, p.start_time, p.text, e.title, e.guest "
"FROM paragraphs p JOIN episodes e ON p.podcast_id = e.podcast_id AND p.episode_id = e.id "
"WHERE p.text LIKE ? LIMIT ?",
(q_like, limit)
).fetchall()
db.close()
return [dict(r) for r in rows]
@app.get("/api/similar/{podcast_id}/{episode_id}/{para_idx}")
def find_similar(podcast_id: str, episode_id: str, para_idx: int,
limit: int = 10, cross_podcast: bool = False):
"""Find semantically similar paragraphs using embeddings."""
db = get_db()
row = db.execute(
"SELECT id, embedding FROM paragraphs WHERE podcast_id = ? AND episode_id = ? AND idx = ?",
(podcast_id, episode_id, para_idx)
).fetchone()
db.close()
if not row or not row["embedding"]:
raise HTTPException(404, "Paragraph not found or not embedded")
query_vec = np.frombuffer(row["embedding"], dtype=np.float32)
query_vec = query_vec / np.linalg.norm(query_vec)
# Load all embeddings
search_podcast = None if cross_podcast else podcast_id
vectors, meta = _load_embeddings(search_podcast)
if vectors is None or len(meta) == 0:
return []
# Cosine similarity (vectors are already normalized)
scores = vectors @ query_vec
# Get top results (skip self)
indices = np.argsort(scores)[::-1]
results = []
for idx in indices:
m = meta[idx]
# Skip self
if m["podcast_id"] == podcast_id and m["episode_id"] == episode_id and m["idx"] == para_idx:
continue
# Skip same episode unless cross_podcast
if not cross_podcast and m["episode_id"] == episode_id:
continue
results.append({
"podcast_id": m["podcast_id"],
"episode_id": m["episode_id"],
"paragraph_idx": m["idx"],
"score": float(scores[idx])
})
if len(results) >= limit:
break
# Enrich with text previews
db = get_db()
for r in results:
p = db.execute(
"SELECT text, start_time FROM paragraphs WHERE podcast_id = ? AND episode_id = ? AND idx = ?",
(r["podcast_id"], r["episode_id"], r["paragraph_idx"])
).fetchone()
if p:
r["text_preview"] = p["text"][:150]
r["start_time"] = p["start_time"]
ep = db.execute(
"SELECT title, guest FROM episodes WHERE podcast_id = ? AND id = ?",
(r["podcast_id"], r["episode_id"])
).fetchone()
if ep:
r["episode_title"] = ep["title"]
r["guest"] = ep["guest"]
db.close()
return results
@app.get("/api/similar-precomputed/{podcast_id}/{episode_id}/{para_idx}")
def get_precomputed_similar(podcast_id: str, episode_id: str, para_idx: int, limit: int = 10):
"""Get precomputed similar paragraphs (fast, no embedding computation)."""
db = get_db()
rows = db.execute(
"SELECT sl.target_podcast, sl.target_episode, sl.target_idx, sl.score, "
"p.text, p.start_time, e.title, e.guest "
"FROM semantic_links sl "
"JOIN paragraphs p ON sl.target_podcast = p.podcast_id AND sl.target_episode = p.episode_id AND sl.target_idx = p.idx "
"JOIN episodes e ON sl.target_podcast = e.podcast_id AND sl.target_episode = e.id "
"WHERE sl.podcast_id = ? AND sl.source_episode = ? AND sl.source_idx = ? "
"ORDER BY sl.score DESC LIMIT ?",
(podcast_id, episode_id, para_idx, limit)
).fetchall()
db.close()
return [{
"podcast_id": r["target_podcast"],
"episode_id": r["target_episode"],
"paragraph_idx": r["target_idx"],
"score": r["score"],
"text_preview": r["text"][:150],
"start_time": r["start_time"],
"episode_title": r["title"],
"guest": r["guest"],
} for r in rows]
@app.get("/api/compare")
def compare_podcasts(a: str = Query(...), b: str = Query(...)):
"""Compare two podcasts: shared topics, stats, cross-links."""
db = get_db()
# Basic stats
stats = {}
for pid in (a, b):
podcast = db.execute("SELECT * FROM podcasts WHERE id = ?", (pid,)).fetchone()
if not podcast:
raise HTTPException(404, f"Podcast '{pid}' not found")
ep_count = db.execute("SELECT COUNT(*) as c FROM episodes WHERE podcast_id = ?", (pid,)).fetchone()["c"]
q_count = db.execute("SELECT COUNT(*) as c FROM quotes WHERE podcast_id = ?", (pid,)).fetchone()["c"]
p_count = db.execute("SELECT COUNT(*) as c FROM paragraphs WHERE podcast_id = ?", (pid,)).fetchone()["c"]
stats[pid] = {"name": podcast["name"], "episodes": ep_count, "quotes": q_count, "paragraphs": p_count}
# Shared topics via topic tags
topics_a = db.execute(
"SELECT DISTINCT t.tag FROM topics t JOIN paragraphs p ON t.paragraph_id = p.id WHERE p.podcast_id = ?", (a,)
).fetchall()
topics_b = db.execute(
"SELECT DISTINCT t.tag FROM topics t JOIN paragraphs p ON t.paragraph_id = p.id WHERE p.podcast_id = ?", (b,)
).fetchall()
set_a = {r["tag"] for r in topics_a}
set_b = {r["tag"] for r in topics_b}
shared = sorted(set_a & set_b)
only_a = sorted(set_a - set_b)
only_b = sorted(set_b - set_a)
# Cross-podcast semantic links count
cross_links = 0
top_links = []
try:
cross_links = db.execute(
"SELECT COUNT(*) as c FROM semantic_links WHERE "
"(podcast_id = ? AND target_podcast = ?) OR (podcast_id = ? AND target_podcast = ?)",
(a, b, b, a)
).fetchone()["c"]
top_links = db.execute(
"SELECT sl.*, p1.text as source_text, p2.text as target_text, "
"e1.title as source_title, e2.title as target_title "
"FROM semantic_links sl "
"JOIN paragraphs p1 ON sl.podcast_id = p1.podcast_id AND sl.source_episode = p1.episode_id AND sl.source_idx = p1.idx "
"JOIN paragraphs p2 ON sl.target_podcast = p2.podcast_id AND sl.target_episode = p2.episode_id AND sl.target_idx = p2.idx "
"JOIN episodes e1 ON sl.podcast_id = e1.podcast_id AND sl.source_episode = e1.id "
"JOIN episodes e2 ON sl.target_podcast = e2.podcast_id AND sl.target_episode = e2.id "
"WHERE (sl.podcast_id = ? AND sl.target_podcast = ?) OR (sl.podcast_id = ? AND sl.target_podcast = ?) "
"ORDER BY sl.score DESC LIMIT 20",
(a, b, b, a)
).fetchall()
except Exception:
pass # semantic_links table may not exist yet
db.close()
return {
"stats": stats,
"shared_topics": shared,
"only_in": {a: only_a, b: only_b},
"cross_links_count": cross_links,
"top_cross_links": [{
"source_podcast": r["podcast_id"], "source_episode": r["source_episode"],
"source_text": r["source_text"][:150], "source_title": r["source_title"],
"target_podcast": r["target_podcast"], "target_episode": r["target_episode"],
"target_text": r["target_text"][:150], "target_title": r["target_title"],
"score": r["score"]
} for r in top_links]
}
@app.get("/api/semantic-search")
def semantic_search(q: str = Query(..., min_length=3), podcast_id: Optional[str] = None, limit: int = 20):
"""Semantic search using query embedding."""
from embeddings import embed_texts
try:
query_vec = np.array(embed_texts([q])[0], dtype=np.float32)
query_vec = query_vec / np.linalg.norm(query_vec)
except Exception as e:
raise HTTPException(500, f"Embedding failed: {e}")
vectors, meta = _load_embeddings(podcast_id)
if vectors is None:
return []
scores = vectors @ query_vec
indices = np.argsort(scores)[::-1][:limit]
db = get_db()
results = []
for idx in indices:
m = meta[idx]
score = float(scores[idx])
if score < 0.3:
break
p = db.execute(
"SELECT text, start_time FROM paragraphs WHERE id = ?", (m["id"],)
).fetchone()
ep = db.execute(
"SELECT title, guest FROM episodes WHERE podcast_id = ? AND id = ?",
(m["podcast_id"], m["episode_id"])
).fetchone()
results.append({
"podcast_id": m["podcast_id"],
"episode_id": m["episode_id"],
"paragraph_idx": m["idx"],
"score": score,
"text_preview": p["text"][:200] if p else "",
"start_time": p["start_time"] if p else None,
"episode_title": ep["title"] if ep else "",
"guest": ep["guest"] if ep else "",
})
db.close()
return results
# ── Startup ──
@app.on_event("startup")
def startup():
init_db()
# Auto-import podcasts from data directory
data_path = Path(DATA_DIR)
if data_path.exists():
for podcast_dir in data_path.iterdir():
if not podcast_dir.is_dir():
continue
mindmap_file = podcast_dir / "mindmap_data.json"
srt_file = podcast_dir / "srt_index.json"
if mindmap_file.exists() and srt_file.exists():
podcast_id = podcast_dir.name
db = get_db()
existing = db.execute("SELECT id FROM podcasts WHERE id = ?", (podcast_id,)).fetchone()
db.close()
if not existing:
print(f"Importing podcast: {podcast_id}")
with open(mindmap_file) as f:
mindmap_data = json.load(f)
with open(srt_file) as f:
srt_index = json.load(f)
from database import import_podcast
import_podcast(podcast_id, mindmap_data, srt_index)
# ── Static Files + Audio ──
# Mount audio directory (per-podcast subdirs)
if os.path.isdir(AUDIO_DIR):
app.mount("/audio", StaticFiles(directory=AUDIO_DIR), name="audio")
# SPA-Routing: erst statische Files versuchen, sonst index.html zurueckliefern.
# Damit funktionieren Tiefen-Links wie /ldn oder /neu-denken (Issue #10).
@app.get("/")
def spa_root():
index = Path(STATIC_DIR) / "index.html"
if index.is_file():
return FileResponse(str(index))
raise HTTPException(404)
@app.get("/{path:path}")
def spa_fallback(path: str):
if path.startswith("api/") or path.startswith("audio/"):
raise HTTPException(404)
static_path = Path(STATIC_DIR) / path
if static_path.is_file():
return FileResponse(str(static_path))
index = Path(STATIC_DIR) / "index.html"
if index.is_file():
return FileResponse(str(index))
raise HTTPException(404)