podcast-mindmap/backend/database.py

198 lines
6.4 KiB
Python
Raw Permalink Normal View History

"""SQLite database for podcast-mindmap: paragraphs, quotes, embeddings."""
import json
import sqlite3
import os
import numpy as np
from typing import Optional
DB_PATH = os.environ.get("DB_PATH", "/data/db.sqlite")
def get_db():
db = sqlite3.connect(DB_PATH)
db.row_factory = sqlite3.Row
db.execute("PRAGMA journal_mode=WAL")
return db
def init_db():
db = get_db()
db.executescript("""
CREATE TABLE IF NOT EXISTS podcasts (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
host TEXT,
description TEXT,
config_json TEXT
);
CREATE TABLE IF NOT EXISTS episodes (
id TEXT NOT NULL,
podcast_id TEXT NOT NULL,
title TEXT NOT NULL,
guest TEXT,
staffel INTEGER,
youtube_id TEXT,
audio_file TEXT,
PRIMARY KEY (podcast_id, id),
FOREIGN KEY (podcast_id) REFERENCES podcasts(id)
);
CREATE TABLE IF NOT EXISTS paragraphs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
podcast_id TEXT NOT NULL,
episode_id TEXT NOT NULL,
idx INTEGER NOT NULL,
start_time REAL,
end_time REAL,
text TEXT NOT NULL,
embedding BLOB,
UNIQUE(podcast_id, episode_id, idx),
FOREIGN KEY (podcast_id) REFERENCES podcasts(id)
);
CREATE TABLE IF NOT EXISTS topics (
paragraph_id INTEGER NOT NULL,
tag TEXT NOT NULL,
score REAL DEFAULT 1.0,
PRIMARY KEY (paragraph_id, tag),
FOREIGN KEY (paragraph_id) REFERENCES paragraphs(id)
);
CREATE TABLE IF NOT EXISTS quotes (
id TEXT NOT NULL,
podcast_id TEXT NOT NULL,
episode_id TEXT NOT NULL,
text TEXT NOT NULL,
verbatim TEXT,
speaker TEXT,
start_time REAL,
end_time REAL,
is_top_quote BOOLEAN DEFAULT 0,
themes_json TEXT DEFAULT '[]',
PRIMARY KEY (podcast_id, id),
FOREIGN KEY (podcast_id) REFERENCES podcasts(id)
);
CREATE TABLE IF NOT EXISTS staffeln (
id INTEGER NOT NULL,
podcast_id TEXT NOT NULL,
name TEXT NOT NULL,
color TEXT DEFAULT '#666',
PRIMARY KEY (podcast_id, id),
FOREIGN KEY (podcast_id) REFERENCES podcasts(id)
);
CREATE TABLE IF NOT EXISTS themes (
id TEXT NOT NULL,
podcast_id TEXT NOT NULL,
label TEXT NOT NULL,
description TEXT,
color TEXT DEFAULT '#666',
episodes_json TEXT DEFAULT '[]',
PRIMARY KEY (podcast_id, id),
FOREIGN KEY (podcast_id) REFERENCES podcasts(id)
);
CREATE INDEX IF NOT EXISTS idx_paragraphs_podcast ON paragraphs(podcast_id, episode_id);
CREATE INDEX IF NOT EXISTS idx_quotes_podcast ON quotes(podcast_id, episode_id);
CREATE INDEX IF NOT EXISTS idx_topics_tag ON topics(tag);
""")
db.commit()
db.close()
def import_podcast(podcast_id: str, mindmap_data: dict, srt_index: dict):
"""Import a podcast's data from mindmap_data.json + srt_index.json into the DB."""
db = get_db()
# Podcast
db.execute(
"INSERT OR REPLACE INTO podcasts (id, name, host, description) VALUES (?, ?, ?, ?)",
(podcast_id, mindmap_data.get("name", ""), mindmap_data.get("host", ""),
mindmap_data.get("description", ""))
)
# Staffeln
for s in mindmap_data.get("staffeln", []):
db.execute(
"INSERT OR REPLACE INTO staffeln (id, podcast_id, name, color) VALUES (?, ?, ?, ?)",
(s["id"], podcast_id, s["name"], s.get("color", "#666"))
)
# Themes
for t in mindmap_data.get("themes", []):
db.execute(
"INSERT OR REPLACE INTO themes (id, podcast_id, label, description, color, episodes_json) VALUES (?, ?, ?, ?, ?, ?)",
(t["id"], podcast_id, t["label"], t.get("description", ""),
t.get("color", "#666"), json.dumps(t.get("episodes", [])))
)
# Episodes
for ep in mindmap_data.get("episodes", []):
db.execute(
"INSERT OR REPLACE INTO episodes (id, podcast_id, title, guest, staffel, audio_file) VALUES (?, ?, ?, ?, ?, ?)",
(ep["id"], podcast_id, ep["title"], ep.get("guest", ""),
ep.get("staffel"), ep.get("audioFile"))
)
# Quotes
for q in mindmap_data.get("quotes", []):
db.execute(
"INSERT OR REPLACE INTO quotes (id, podcast_id, episode_id, text, verbatim, speaker, start_time, end_time, is_top_quote, themes_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(q["id"], podcast_id, q["episode"], q["text"], q.get("verbatim"),
q.get("speaker", ""), q.get("startTime"), q.get("endTime"),
q.get("isTopQuote", False), json.dumps(q.get("themes", [])))
)
# Paragraphs from srt_index
for ep_key, ep_data in srt_index.items():
ep_id = ep_key.split("-")[0] # S1E1-Wachstum → S1E1
for i, p in enumerate(ep_data.get("paragraphs", [])):
db.execute(
"INSERT OR REPLACE INTO paragraphs (podcast_id, episode_id, idx, start_time, end_time, text) VALUES (?, ?, ?, ?, ?, ?)",
(podcast_id, ep_id, i, p["start"], p["end"], p["text"])
)
db.commit()
db.close()
def get_all_embeddings(podcast_id: Optional[str] = None):
"""Load all embeddings as numpy array + metadata."""
db = get_db()
if podcast_id:
rows = db.execute(
"SELECT id, podcast_id, episode_id, idx, embedding FROM paragraphs WHERE podcast_id = ? AND embedding IS NOT NULL",
(podcast_id,)
).fetchall()
else:
rows = db.execute(
"SELECT id, podcast_id, episode_id, idx, embedding FROM paragraphs WHERE embedding IS NOT NULL"
).fetchall()
db.close()
if not rows:
return None, []
meta = [{"id": r["id"], "podcast_id": r["podcast_id"],
"episode_id": r["episode_id"], "idx": r["idx"]} for r in rows]
vectors = np.array([np.frombuffer(r["embedding"], dtype=np.float32) for r in rows])
# Normalize for cosine similarity
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
norms[norms == 0] = 1
vectors = vectors / norms
return vectors, meta
def store_embedding(paragraph_id: int, embedding: list[float]):
"""Store embedding as binary blob."""
db = get_db()
blob = np.array(embedding, dtype=np.float32).tobytes()
db.execute("UPDATE paragraphs SET embedding = ? WHERE id = ?", (blob, paragraph_id))
db.commit()
db.close()