podcast-mindmap/scripts/extract_claims.py

141 lines
5.0 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""#16 Claim-Verification-Layer: Extrahiere prüfbare Behauptungen aus Transkripten.
Nutzung:
DASHSCOPE_API_KEY=... python3 extract_claims.py [db-pfad]
"""
import json
import os
import sys
import time
import sqlite3
from openai import OpenAI
DB_PATH = sys.argv[1] if len(sys.argv) > 1 else "data/db.sqlite"
API_KEY = os.environ.get("DASHSCOPE_API_KEY", "")
BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1"
MODEL = "qwen-turbo" # Günstiger für Massenverarbeitung
BATCH_SIZE = 3 # Absätze pro API-Call
SYSTEM_PROMPT = """Du bist ein Faktenprüfer. Du erhältst Podcast-Transkript-Absätze.
Extrahiere ALLE prüfbaren faktischen Behauptungen (Zahlen, Statistiken, kausale Aussagen, Verweise auf Studien/Gesetze).
KEINE Meinungen, Bewertungen oder rhetorische Fragen.
Antworte NUR mit einem JSON-Array. Für jeden Absatz ein Objekt:
[{"paragraph_idx": 0, "claims": [{"text": "Die Behauptung", "type": "statistic|causal|reference|number", "verifiable": true}]}]
Wenn ein Absatz keine prüfbaren Claims enthält: {"paragraph_idx": 0, "claims": []}"""
def extract_batch(client, paragraphs):
"""Extrahiere Claims aus einem Batch von Absätzen."""
user_msg = ""
for i, p in enumerate(paragraphs):
user_msg += f"\n--- Absatz {i} ({p['episode_id']}, {p['start_time']:.0f}s) ---\n{p['text'][:600]}\n"
try:
resp = client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_msg},
],
temperature=0.1,
max_tokens=1000,
)
content = resp.choices[0].message.content.strip()
if content.startswith("```"):
content = content.split("```")[1].strip()
if content.startswith("json"):
content = content[4:].strip()
return json.loads(content)
except Exception as e:
return [{"paragraph_idx": i, "claims": [], "error": str(e)} for i in range(len(paragraphs))]
def main():
if not API_KEY:
print("DASHSCOPE_API_KEY nicht gesetzt.")
sys.exit(1)
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
db = sqlite3.connect(DB_PATH)
db.row_factory = sqlite3.Row
# Create output table
db.executescript("""
CREATE TABLE IF NOT EXISTS claims (
id INTEGER PRIMARY KEY AUTOINCREMENT,
podcast_id TEXT, episode_id TEXT, paragraph_idx INTEGER,
claim_text TEXT, claim_type TEXT, verifiable BOOLEAN,
start_time REAL
);
CREATE INDEX IF NOT EXISTS idx_claims_episode ON claims(podcast_id, episode_id);
CREATE INDEX IF NOT EXISTS idx_claims_type ON claims(claim_type);
""")
# Check what's already processed
processed_keys = set()
try:
for r in db.execute("SELECT DISTINCT podcast_id||'/'||episode_id||'/'||paragraph_idx as k FROM claims").fetchall():
processed_keys.add(r["k"])
except Exception:
pass
# Get all paragraphs
rows = db.execute(
"SELECT id, podcast_id, episode_id, idx, start_time, text FROM paragraphs ORDER BY podcast_id, episode_id, idx"
).fetchall()
# Filter already processed
todo = [r for r in rows if f"{r['podcast_id']}/{r['episode_id']}/{r['idx']}" not in processed_keys]
print(f"Extrahiere Claims: {len(todo)} Absätze zu verarbeiten ({len(rows) - len(todo)} bereits fertig)")
total_claims = 0
for i in range(0, len(todo), BATCH_SIZE):
batch = todo[i:i + BATCH_SIZE]
paras = [{"episode_id": r["episode_id"], "start_time": r["start_time"] or 0, "text": r["text"]} for r in batch]
results = extract_batch(client, paras)
for j, result in enumerate(results):
if j >= len(batch):
break
row = batch[j]
for claim in result.get("claims", []):
db.execute(
"INSERT INTO claims (podcast_id, episode_id, paragraph_idx, claim_text, claim_type, verifiable, start_time) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(row["podcast_id"], row["episode_id"], row["idx"],
claim.get("text", ""), claim.get("type", "unknown"),
claim.get("verifiable", True), row["start_time"])
)
total_claims += 1
if (i // BATCH_SIZE) % 20 == 0:
db.commit()
print(f" {min(i + BATCH_SIZE, len(todo))}/{len(todo)} Absätze, {total_claims} Claims bisher")
time.sleep(0.2)
db.commit()
# Stats
stats = db.execute("SELECT claim_type, COUNT(*) as c FROM claims GROUP BY claim_type ORDER BY c DESC").fetchall()
podcast_stats = db.execute("SELECT podcast_id, COUNT(*) as c FROM claims GROUP BY podcast_id").fetchall()
print(f"\nFertig: {total_claims} Claims extrahiert.")
print("Nach Typ:")
for s in stats:
print(f" {s['claim_type']}: {s['c']}")
print("Nach Podcast:")
for s in podcast_stats:
print(f" {s['podcast_id']}: {s['c']}")
db.close()
if __name__ == "__main__":
main()