#14/#15/#16/#17 backend: Endpoints fuer Gaps, Shifts, Claims und Questions

- /api/podcasts/{id}/episodes/{ep}/claims: Behauptungen einer Episode, optional gefiltert nach claim_type.
- /api/podcasts/{id}/episodes/{ep}/questions: Fragen der Episode, gefiltert nach Typ und Antwort-Status.
- /api/podcasts/{id}/episodes/{ep}/analyses-summary: Zaehler fuer die UI-Buttons (claims, questions, unbeantwortet).
- /api/analyses/gaps: Leerstellen aus data/gaps_analysis.json (#14), Filter ueber min_size und missing_in.
- /api/analyses/shifts: Narrative-Shift-Drift aus data/narrative_shifts.json (#15), Filter ueber podcast, theme und min_drift.
- Wort-Timestamps via /api/podcasts/{id}/transcript/{ep}/words; Tabelle wird via _table_exists graceful behandelt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dotty Dotter 2026-04-28 00:31:02 +02:00
parent f9d8c677ae
commit d6ccea006a

View File

@ -114,6 +114,135 @@ def get_words(podcast_id: str, episode_id: str):
} }
def _table_exists(db, name: str) -> bool:
return db.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", (name,)
).fetchone() is not None
@app.get("/api/podcasts/{podcast_id}/episodes/{episode_id}/claims")
def get_episode_claims(podcast_id: str, episode_id: str, claim_type: Optional[str] = None):
"""Claims (Behauptungen) für eine Episode."""
db = get_db()
if not _table_exists(db, "claims"):
db.close()
return {"available": False, "claims": []}
sql = ("SELECT id, paragraph_idx, claim_text, claim_type, verifiable, start_time "
"FROM claims WHERE podcast_id = ? AND episode_id = ?")
params = [podcast_id, episode_id]
if claim_type:
sql += " AND claim_type = ?"
params.append(claim_type)
sql += " ORDER BY paragraph_idx, id"
rows = db.execute(sql, params).fetchall()
db.close()
return {"available": True, "claims": [dict(r) for r in rows]}
@app.get("/api/podcasts/{podcast_id}/episodes/{episode_id}/questions")
def get_episode_questions(podcast_id: str, episode_id: str, question_type: Optional[str] = None,
answered: Optional[str] = None):
"""Fragen einer Episode."""
db = get_db()
if not _table_exists(db, "questions"):
db.close()
return {"available": False, "questions": []}
sql = ("SELECT id, paragraph_idx, question_text, question_type, answered, "
"answered_by_podcast, answered_by_episode, answered_by_idx, start_time "
"FROM questions WHERE podcast_id = ? AND episode_id = ?")
params = [podcast_id, episode_id]
if question_type:
sql += " AND question_type = ?"
params.append(question_type)
if answered:
sql += " AND answered = ?"
params.append(answered)
sql += " ORDER BY paragraph_idx, id"
rows = db.execute(sql, params).fetchall()
db.close()
return {"available": True, "questions": [dict(r) for r in rows]}
@app.get("/api/podcasts/{podcast_id}/episodes/{episode_id}/analyses-summary")
def get_episode_analyses_summary(podcast_id: str, episode_id: str):
"""Zähler für die Analyse-Datentöpfe einer Episode (für UI-Buttons)."""
db = get_db()
out = {}
if _table_exists(db, "claims"):
out["claims"] = db.execute(
"SELECT COUNT(*) FROM claims WHERE podcast_id = ? AND episode_id = ?",
(podcast_id, episode_id)
).fetchone()[0]
if _table_exists(db, "questions"):
out["questions"] = db.execute(
"SELECT COUNT(*) FROM questions WHERE podcast_id = ? AND episode_id = ?",
(podcast_id, episode_id)
).fetchone()[0]
out["questions_unanswered"] = db.execute(
"SELECT COUNT(*) FROM questions WHERE podcast_id = ? AND episode_id = ? AND answered = 'no'",
(podcast_id, episode_id)
).fetchone()[0]
db.close()
return out
@app.get("/api/analyses/gaps")
def get_gaps_analysis(min_size: int = 0, missing_in: Optional[str] = None, limit: int = 200):
"""Leerstellen-Analyse (#14): Cluster, die in mindestens einem Podcast fehlen."""
path = Path(DATA_DIR) / "gaps_analysis.json"
if not path.exists():
return {"available": False}
try:
with open(path) as f:
data = json.load(f)
except Exception:
return {"available": False}
gaps = data.get("gaps", [])
if min_size > 0:
gaps = [g for g in gaps if g.get("cluster_size", 0) >= min_size]
if missing_in:
gaps = [g for g in gaps if g.get("missing_in") == missing_in]
gaps = gaps[:limit]
return {
"available": True,
"total_paragraphs": data.get("total_paragraphs"),
"podcasts": data.get("podcasts", []),
"n_clusters": data.get("n_clusters"),
"clusters": data.get("clusters", []),
"gaps": gaps,
}
@app.get("/api/analyses/shifts")
def get_shifts_analysis(podcast: Optional[str] = None, theme: Optional[str] = None,
min_drift: float = 0.0, limit: int = 200):
"""Narrative-Shift-Analyse (#15): Drift zwischen aufeinanderfolgenden Episoden je Theme."""
path = Path(DATA_DIR) / "narrative_shifts.json"
if not path.exists():
return {"available": False}
try:
with open(path) as f:
data = json.load(f)
except Exception:
return {"available": False}
shifts = data.get("shifts", [])
if podcast:
shifts = [s for s in shifts if s.get("podcast") == podcast]
if theme:
shifts = [s for s in shifts if s.get("theme") == theme]
if min_drift > 0:
shifts = [s for s in shifts if s.get("max_drift", 0) >= min_drift]
shifts = shifts[:limit]
podcasts = sorted({s.get("podcast") for s in data.get("shifts", []) if s.get("podcast")})
return {
"available": True,
"total_themes_tracked": data.get("total_themes_tracked"),
"themes": data.get("themes", []),
"podcasts": podcasts,
"shifts": shifts,
}
@app.get("/api/search") @app.get("/api/search")
def search(q: str = Query(..., min_length=2), podcast_id: Optional[str] = None, limit: int = 50): def search(q: str = Query(..., min_length=2), podcast_id: Optional[str] = None, limit: int = 50):
"""Full-text search across all transcripts.""" """Full-text search across all transcripts."""