podcast-mindmap/scripts/analyse_arguments.py
Dotty Dotter 839ae2c27e #13/#18 Robuster JSON-Parser + --rerun-errors-Modus
- scripts/json_utils.py: parse_llm_json() mit Codefence-Strip, balanced-brace-Extractor, Truncation-Repair, Inner-Quote-Escape und Trailing-Comma-Strip.
- scripts/analyse_arguments.py + scripts/curate_debates.py: nutzen den neuen Parser, drei Retries bei Netz/Rate-Limit, --rerun-errors-Pfad fuer das Reparieren bestehender error-Records, busy_timeout=60s gegen SQLite-Locks.
- scripts/rerun_errors.py: Standalone-Re-Runner fuer beide Tabellen (debates.topic='error' und argument_links.relation='error') mit Budget-Limit, behaelt IDs via UPDATE.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 00:30:45 +02:00

215 lines
8.5 KiB
Python

#!/usr/bin/env python3
"""#13 Argumentketten-Tracker: Klassifiziere logische Relationen zwischen semantisch ähnlichen Absätzen.
Nimmt die Top-N semantic_links und lässt Qwen die Relation klassifizieren:
erweitert, widerspricht, belegt, relativiert, gleicher_punkt, kein_bezug.
Nutzung:
DASHSCOPE_API_KEY=... python3 analyse_arguments.py [db-pfad] [limit]
"""
import json
import os
import sys
import time
import sqlite3
from openai import OpenAI
# Lokaler Helper
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from json_utils import parse_llm_json
DB_PATH = sys.argv[1] if len(sys.argv) > 1 else "data/db.sqlite"
LIMIT = int(sys.argv[2]) if len(sys.argv) > 2 and not sys.argv[2].startswith("--") else 500
RERUN_ERRORS = "--rerun-errors" in sys.argv
API_KEY = os.environ.get("DASHSCOPE_API_KEY", "")
BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1"
MODEL = "qwen-plus"
SYSTEM_PROMPT = """Du bist ein Diskursanalyst. Du erhältst zwei Textabschnitte aus Podcast-Transkripten.
Klassifiziere die logische Relation zwischen ihnen. Antworte NUR mit einem JSON-Objekt:
{"relation": "...", "confidence": 0.0-1.0, "explanation": "Ein Satz Begründung"}
Mögliche Relationen:
- "erweitert": B baut auf A auf, ergänzt, vertieft
- "widerspricht": B widerspricht A, nennt Gegenargument
- "belegt": B liefert Evidenz/Daten für A's These
- "relativiert": B schränkt A ein, nennt Ausnahmen/Bedingungen
- "gleicher_punkt": A und B sagen im Kern dasselbe
- "kein_bezug": Trotz thematischer Nähe kein logischer Bezug"""
def classify_pair(client, text_a, meta_a, text_b, meta_b):
user_msg = f"""Absatz A ({meta_a}):
"{text_a}"
Absatz B ({meta_b}):
"{text_b}"
Welche logische Relation besteht von A zu B?"""
last_err = None
for attempt in range(3):
try:
resp = client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_msg},
],
temperature=0.1,
max_tokens=200,
)
content = resp.choices[0].message.content
usage = getattr(resp, "usage", None)
tokens = (usage.prompt_tokens, usage.completion_tokens) if usage else (0, 0)
try:
parsed = parse_llm_json(content, expect="object")
parsed["_tokens"] = tokens
return parsed
except ValueError as pe:
last_err = f"parse: {pe}"
# Bei Parse-Fehler kein Retry: das Modell wuerde wieder dasselbe liefern.
break
except Exception as e:
last_err = str(e)
# Retry bei Netzwerk/Rate-Limit
if attempt < 2:
time.sleep(2 ** attempt)
continue
break
return {"relation": "error", "confidence": 0, "explanation": str(last_err), "_tokens": (0, 0)}
def main():
if not API_KEY:
print("DASHSCOPE_API_KEY nicht gesetzt.")
sys.exit(1)
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
db = sqlite3.connect(DB_PATH, timeout=60.0)
db.execute("PRAGMA busy_timeout=60000")
db.row_factory = sqlite3.Row
# Create output table
db.executescript("""
CREATE TABLE IF NOT EXISTS argument_links (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_podcast TEXT, source_episode TEXT, source_idx INTEGER,
target_podcast TEXT, target_episode TEXT, target_idx INTEGER,
relation TEXT, confidence REAL, explanation TEXT, score REAL
);
CREATE INDEX IF NOT EXISTS idx_arglinks ON argument_links(relation);
""")
if RERUN_ERRORS:
# Hole error-Records, loesche sie, baue Eingabe-Liste daraus auf.
err_rows = db.execute("""
SELECT al.source_podcast as podcast_id, al.source_episode, al.source_idx,
al.target_podcast, al.target_episode, al.target_idx, al.score,
p1.text as source_text, p2.text as target_text,
e1.title as source_title, e1.guest as source_guest,
e2.title as target_title, e2.guest as target_guest
FROM argument_links al
JOIN paragraphs p1 ON al.source_podcast = p1.podcast_id AND al.source_episode = p1.episode_id AND al.source_idx = p1.idx
JOIN paragraphs p2 ON al.target_podcast = p2.podcast_id AND al.target_episode = p2.episode_id AND al.target_idx = p2.idx
JOIN episodes e1 ON al.source_podcast = e1.podcast_id AND al.source_episode = e1.id
JOIN episodes e2 ON al.target_podcast = e2.podcast_id AND al.target_episode = e2.id
WHERE al.relation = 'error'
""").fetchall()
rows = err_rows
del_count = db.execute("DELETE FROM argument_links WHERE relation='error'").rowcount
db.commit()
print(f"RE-RUN: {del_count} error-Records geloescht, {len(rows)} werden neu klassifiziert.")
existing = set()
else:
# Get top semantic links (cross-episode, prefer cross-podcast)
rows = db.execute("""
SELECT sl.podcast_id, sl.source_episode, sl.source_idx,
sl.target_podcast, sl.target_episode, sl.target_idx, sl.score,
p1.text as source_text, p2.text as target_text,
e1.title as source_title, e1.guest as source_guest,
e2.title as target_title, e2.guest as target_guest
FROM semantic_links sl
JOIN paragraphs p1 ON sl.podcast_id = p1.podcast_id AND sl.source_episode = p1.episode_id AND sl.source_idx = p1.idx
JOIN paragraphs p2 ON sl.target_podcast = p2.podcast_id AND sl.target_episode = p2.episode_id AND sl.target_idx = p2.idx
JOIN episodes e1 ON sl.podcast_id = e1.podcast_id AND sl.source_episode = e1.id
JOIN episodes e2 ON sl.target_podcast = e2.podcast_id AND sl.target_episode = e2.id
WHERE sl.source_episode != sl.target_episode
ORDER BY sl.score DESC
LIMIT ?
""", (LIMIT,)).fetchall()
print(f"Klassifiziere {len(rows)} Paare mit {MODEL}")
# Check already processed
existing = set()
try:
for r in db.execute("SELECT source_podcast||source_episode||source_idx||target_podcast||target_episode||target_idx as k FROM argument_links").fetchall():
existing.add(r["k"])
except Exception:
pass
processed = 0
skipped = 0
total_in_tokens = 0
total_out_tokens = 0
for i, row in enumerate(rows):
key = f"{row['podcast_id']}{row['source_episode']}{row['source_idx']}{row['target_podcast']}{row['target_episode']}{row['target_idx']}"
if key in existing:
skipped += 1
continue
meta_a = f"{row['source_episode']}: {row['source_title']}{row['source_guest']}"
meta_b = f"{row['target_episode']}: {row['target_title']}{row['target_guest']}"
result = classify_pair(
client,
row["source_text"][:800], meta_a,
row["target_text"][:800], meta_b
)
in_t, out_t = result.pop("_tokens", (0, 0))
total_in_tokens += in_t
total_out_tokens += out_t
db.execute(
"INSERT INTO argument_links (source_podcast, source_episode, source_idx, "
"target_podcast, target_episode, target_idx, relation, confidence, explanation, score) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(row["podcast_id"], row["source_episode"], row["source_idx"],
row["target_podcast"], row["target_episode"], row["target_idx"],
result.get("relation", "error"), result.get("confidence", 0),
result.get("explanation", ""), row["score"])
)
processed += 1
if processed % 10 == 0:
db.commit()
print(f" {processed}/{len(rows) - skipped} klassifiziert…")
# Rate limiting
time.sleep(0.3)
db.commit()
# Stats
stats = db.execute("SELECT relation, COUNT(*) as c FROM argument_links GROUP BY relation ORDER BY c DESC").fetchall()
print(f"\nFertig: {processed} neue, {skipped} übersprungen.")
print("Verteilung:")
for s in stats:
print(f" {s['relation']}: {s['c']}")
# qwen-plus: ~$0.40/1M input, ~$1.20/1M output (DashScope intl, grobe Schaetzung)
cost = total_in_tokens / 1e6 * 0.40 + total_out_tokens / 1e6 * 1.20
print(f"Tokens: in={total_in_tokens} out={total_out_tokens} ~${cost:.4f}")
db.close()
if __name__ == "__main__":
main()