#!/usr/bin/env python3 """Batch-Transkription mit wortgenauen Timestamps via mlx-whisper. Erzeugt pro Episode eine JSON-Datei mit Wort-Level-Timing. Läuft auf Apple Silicon (mlx-metal). Nutzung: python3 transcribe_words.py /pfad/zu/audio/ /pfad/zu/output/ python3 transcribe_words.py /pfad/zu/audio/S1E1-Wachstum.m4a # einzelne Datei Modell: whisper-large-v3-turbo (schnell + genau, ~1.5 GB VRAM) """ import json import os import sys import time from pathlib import Path # ── Config ── MODEL = "mlx-community/whisper-large-v3-turbo" LANGUAGE = "de" AUDIO_EXTENSIONS = {".m4a", ".mp3", ".wav", ".flac", ".ogg", ".opus"} def transcribe_episode(audio_path: str, output_dir: str) -> dict: """Transkribiere eine Episode mit Wort-Timestamps.""" import mlx_whisper name = Path(audio_path).stem output_file = Path(output_dir) / f"{name}.words.json" # Skip wenn bereits vorhanden if output_file.exists(): print(f" ⏭ {name} — bereits vorhanden, überspringe") return json.loads(output_file.read_text()) print(f" ▶ {name} — transkribiere…") t0 = time.time() result = mlx_whisper.transcribe( audio_path, path_or_hf_repo=MODEL, language=LANGUAGE, word_timestamps=True, verbose=False, condition_on_previous_text=True, initial_prompt="NEU DENKEN Podcast mit Maja Göpel. Themen: Wirtschaft, Demokratie, Sicherheit, Freiheit.", ) elapsed = time.time() - t0 # Extrahiere Wörter aus Segmenten words = [] for segment in result.get("segments", []): for w in segment.get("words", []): words.append({ "word": w["word"].strip(), "start": round(w["start"], 3), "end": round(w["end"], 3), }) # Auch Segment-Level behalten (für Absatz-Mapping) segments = [] for seg in result.get("segments", []): segments.append({ "start": round(seg["start"], 3), "end": round(seg["end"], 3), "text": seg["text"].strip(), "words": [{ "word": w["word"].strip(), "start": round(w["start"], 3), "end": round(w["end"], 3), } for w in seg.get("words", [])], }) output = { "episode": name, "model": MODEL, "language": LANGUAGE, "duration_seconds": round(elapsed, 1), "word_count": len(words), "segment_count": len(segments), "segments": segments, } output_file.write_text(json.dumps(output, ensure_ascii=False, indent=2)) print(f" ✓ {name} — {len(words)} Wörter, {len(segments)} Segmente, {elapsed:.0f}s") return output def main(): if len(sys.argv) < 2: print(f"Nutzung: {sys.argv[0]} [output-verzeichnis]") sys.exit(1) input_path = Path(sys.argv[1]) output_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else input_path if input_path.is_dir() else input_path.parent output_dir.mkdir(parents=True, exist_ok=True) # Einzelne Datei oder Verzeichnis? if input_path.is_file(): files = [input_path] elif input_path.is_dir(): files = sorted([f for f in input_path.iterdir() if f.suffix.lower() in AUDIO_EXTENSIONS]) else: print(f"Fehler: {input_path} existiert nicht.") sys.exit(1) if not files: print("Keine Audio-Dateien gefunden.") sys.exit(1) print(f"Transkribiere {len(files)} Dateien → {output_dir}/") print(f"Modell: {MODEL}") print() total_t0 = time.time() results = [] for i, f in enumerate(files, 1): print(f"[{i}/{len(files)}] {f.name}") try: result = transcribe_episode(str(f), str(output_dir)) results.append(result) except Exception as e: print(f" ✗ FEHLER: {e}") total_elapsed = time.time() - total_t0 total_words = sum(r.get("word_count", 0) for r in results) print() print(f"Fertig: {len(results)}/{len(files)} Episoden, {total_words} Wörter, {total_elapsed:.0f}s gesamt") if __name__ == "__main__": main()