96 lines
2.8 KiB
Python
96 lines
2.8 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Importiert Wort-Level-Timestamps in die SQLite-Datenbank.
|
||
|
|
|
||
|
|
Liest *.words.json-Dateien und schreibt in die Tabelle `words`.
|
||
|
|
|
||
|
|
Nutzung:
|
||
|
|
python3 import_words.py <podcast_id> <words-json-verzeichnis> [db-pfad]
|
||
|
|
|
||
|
|
Beispiel:
|
||
|
|
python3 import_words.py neu-denken ../data/neu-denken/words/ ../data/db.sqlite
|
||
|
|
"""
|
||
|
|
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import sys
|
||
|
|
import sqlite3
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
|
||
|
|
def init_words_table(db):
|
||
|
|
"""Erstelle words-Tabelle falls nicht vorhanden."""
|
||
|
|
db.executescript("""
|
||
|
|
CREATE TABLE IF NOT EXISTS words (
|
||
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
|
|
podcast_id TEXT NOT NULL,
|
||
|
|
episode_id TEXT NOT NULL,
|
||
|
|
segment_idx INTEGER NOT NULL,
|
||
|
|
word_idx INTEGER NOT NULL,
|
||
|
|
word TEXT NOT NULL,
|
||
|
|
start_time REAL NOT NULL,
|
||
|
|
end_time REAL NOT NULL,
|
||
|
|
UNIQUE(podcast_id, episode_id, segment_idx, word_idx)
|
||
|
|
);
|
||
|
|
CREATE INDEX IF NOT EXISTS idx_words_episode ON words(podcast_id, episode_id);
|
||
|
|
CREATE INDEX IF NOT EXISTS idx_words_time ON words(podcast_id, episode_id, start_time);
|
||
|
|
""")
|
||
|
|
|
||
|
|
|
||
|
|
def import_words_file(db, podcast_id: str, words_file: Path):
|
||
|
|
"""Importiere eine *.words.json-Datei."""
|
||
|
|
data = json.loads(words_file.read_text())
|
||
|
|
episode_name = data["episode"]
|
||
|
|
|
||
|
|
# Episode-ID aus Dateinamen: S1E1-Wachstum → S1E1
|
||
|
|
episode_id = episode_name.split("-")[0]
|
||
|
|
|
||
|
|
# Alte Einträge löschen
|
||
|
|
db.execute("DELETE FROM words WHERE podcast_id = ? AND episode_id = ?", (podcast_id, episode_id))
|
||
|
|
|
||
|
|
count = 0
|
||
|
|
for seg_idx, segment in enumerate(data.get("segments", [])):
|
||
|
|
for word_idx, w in enumerate(segment.get("words", [])):
|
||
|
|
db.execute(
|
||
|
|
"INSERT INTO words (podcast_id, episode_id, segment_idx, word_idx, word, start_time, end_time) "
|
||
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||
|
|
(podcast_id, episode_id, seg_idx, word_idx, w["word"], w["start"], w["end"])
|
||
|
|
)
|
||
|
|
count += 1
|
||
|
|
|
||
|
|
return count
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
if len(sys.argv) < 3:
|
||
|
|
print(f"Nutzung: {sys.argv[0]} <podcast_id> <words-verzeichnis> [db-pfad]")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
podcast_id = sys.argv[1]
|
||
|
|
words_dir = Path(sys.argv[2])
|
||
|
|
db_path = sys.argv[3] if len(sys.argv) > 3 else os.environ.get("DB_PATH", "data/db.sqlite")
|
||
|
|
|
||
|
|
db = sqlite3.connect(db_path)
|
||
|
|
init_words_table(db)
|
||
|
|
|
||
|
|
files = sorted(words_dir.glob("*.words.json"))
|
||
|
|
if not files:
|
||
|
|
print(f"Keine *.words.json-Dateien in {words_dir} gefunden.")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
print(f"Importiere {len(files)} Dateien für Podcast '{podcast_id}'")
|
||
|
|
|
||
|
|
total_words = 0
|
||
|
|
for f in files:
|
||
|
|
count = import_words_file(db, podcast_id, f)
|
||
|
|
print(f" {f.stem}: {count} Wörter")
|
||
|
|
total_words += count
|
||
|
|
|
||
|
|
db.commit()
|
||
|
|
db.close()
|
||
|
|
|
||
|
|
print(f"Fertig: {total_words} Wörter importiert.")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|