From 839ae2c27ea7327c3232c78a33514b1d5a01edad Mon Sep 17 00:00:00 2001 From: Dotty Dotter Date: Tue, 28 Apr 2026 00:30:45 +0200 Subject: [PATCH] #13/#18 Robuster JSON-Parser + --rerun-errors-Modus - scripts/json_utils.py: parse_llm_json() mit Codefence-Strip, balanced-brace-Extractor, Truncation-Repair, Inner-Quote-Escape und Trailing-Comma-Strip. - scripts/analyse_arguments.py + scripts/curate_debates.py: nutzen den neuen Parser, drei Retries bei Netz/Rate-Limit, --rerun-errors-Pfad fuer das Reparieren bestehender error-Records, busy_timeout=60s gegen SQLite-Locks. - scripts/rerun_errors.py: Standalone-Re-Runner fuer beide Tabellen (debates.topic='error' und argument_links.relation='error') mit Budget-Limit, behaelt IDs via UPDATE. Co-Authored-By: Claude Opus 4.7 (1M context) --- scripts/analyse_arguments.py | 138 ++++++++++----- scripts/curate_debates.py | 137 ++++++++++----- scripts/json_utils.py | 322 +++++++++++++++++++++++++++++++++++ scripts/rerun_errors.py | 239 ++++++++++++++++++++++++++ 4 files changed, 745 insertions(+), 91 deletions(-) create mode 100644 scripts/json_utils.py create mode 100644 scripts/rerun_errors.py diff --git a/scripts/analyse_arguments.py b/scripts/analyse_arguments.py index 6c7c941..5ebb8a5 100644 --- a/scripts/analyse_arguments.py +++ b/scripts/analyse_arguments.py @@ -16,8 +16,13 @@ import sqlite3 from openai import OpenAI +# Lokaler Helper +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from json_utils import parse_llm_json + DB_PATH = sys.argv[1] if len(sys.argv) > 1 else "data/db.sqlite" -LIMIT = int(sys.argv[2]) if len(sys.argv) > 2 else 500 +LIMIT = int(sys.argv[2]) if len(sys.argv) > 2 and not sys.argv[2].startswith("--") else 500 +RERUN_ERRORS = "--rerun-errors" in sys.argv API_KEY = os.environ.get("DASHSCOPE_API_KEY", "") BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1" @@ -46,25 +51,37 @@ Absatz B ({meta_b}): Welche logische Relation besteht von A zu B?""" - try: - resp = client.chat.completions.create( - model=MODEL, - messages=[ - {"role": "system", "content": SYSTEM_PROMPT}, - {"role": "user", "content": user_msg}, - ], - temperature=0.1, - max_tokens=150, - ) - content = resp.choices[0].message.content.strip() - # Parse JSON from response - if content.startswith("```"): - content = content.split("```")[1].strip() - if content.startswith("json"): - content = content[4:].strip() - return json.loads(content) - except Exception as e: - return {"relation": "error", "confidence": 0, "explanation": str(e)} + last_err = None + for attempt in range(3): + try: + resp = client.chat.completions.create( + model=MODEL, + messages=[ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": user_msg}, + ], + temperature=0.1, + max_tokens=200, + ) + content = resp.choices[0].message.content + usage = getattr(resp, "usage", None) + tokens = (usage.prompt_tokens, usage.completion_tokens) if usage else (0, 0) + try: + parsed = parse_llm_json(content, expect="object") + parsed["_tokens"] = tokens + return parsed + except ValueError as pe: + last_err = f"parse: {pe}" + # Bei Parse-Fehler kein Retry: das Modell wuerde wieder dasselbe liefern. + break + except Exception as e: + last_err = str(e) + # Retry bei Netzwerk/Rate-Limit + if attempt < 2: + time.sleep(2 ** attempt) + continue + break + return {"relation": "error", "confidence": 0, "explanation": str(last_err), "_tokens": (0, 0)} def main(): @@ -73,7 +90,8 @@ def main(): sys.exit(1) client = OpenAI(api_key=API_KEY, base_url=BASE_URL) - db = sqlite3.connect(DB_PATH) + db = sqlite3.connect(DB_PATH, timeout=60.0) + db.execute("PRAGMA busy_timeout=60000") db.row_factory = sqlite3.Row # Create output table @@ -87,35 +105,58 @@ def main(): CREATE INDEX IF NOT EXISTS idx_arglinks ON argument_links(relation); """) - # Get top semantic links (cross-episode, prefer cross-podcast) - rows = db.execute(""" - SELECT sl.podcast_id, sl.source_episode, sl.source_idx, - sl.target_podcast, sl.target_episode, sl.target_idx, sl.score, - p1.text as source_text, p2.text as target_text, - e1.title as source_title, e1.guest as source_guest, - e2.title as target_title, e2.guest as target_guest - FROM semantic_links sl - JOIN paragraphs p1 ON sl.podcast_id = p1.podcast_id AND sl.source_episode = p1.episode_id AND sl.source_idx = p1.idx - JOIN paragraphs p2 ON sl.target_podcast = p2.podcast_id AND sl.target_episode = p2.episode_id AND sl.target_idx = p2.idx - JOIN episodes e1 ON sl.podcast_id = e1.podcast_id AND sl.source_episode = e1.id - JOIN episodes e2 ON sl.target_podcast = e2.podcast_id AND sl.target_episode = e2.id - WHERE sl.source_episode != sl.target_episode - ORDER BY sl.score DESC - LIMIT ? - """, (LIMIT,)).fetchall() + if RERUN_ERRORS: + # Hole error-Records, loesche sie, baue Eingabe-Liste daraus auf. + err_rows = db.execute(""" + SELECT al.source_podcast as podcast_id, al.source_episode, al.source_idx, + al.target_podcast, al.target_episode, al.target_idx, al.score, + p1.text as source_text, p2.text as target_text, + e1.title as source_title, e1.guest as source_guest, + e2.title as target_title, e2.guest as target_guest + FROM argument_links al + JOIN paragraphs p1 ON al.source_podcast = p1.podcast_id AND al.source_episode = p1.episode_id AND al.source_idx = p1.idx + JOIN paragraphs p2 ON al.target_podcast = p2.podcast_id AND al.target_episode = p2.episode_id AND al.target_idx = p2.idx + JOIN episodes e1 ON al.source_podcast = e1.podcast_id AND al.source_episode = e1.id + JOIN episodes e2 ON al.target_podcast = e2.podcast_id AND al.target_episode = e2.id + WHERE al.relation = 'error' + """).fetchall() + rows = err_rows + del_count = db.execute("DELETE FROM argument_links WHERE relation='error'").rowcount + db.commit() + print(f"RE-RUN: {del_count} error-Records geloescht, {len(rows)} werden neu klassifiziert.") + existing = set() + else: + # Get top semantic links (cross-episode, prefer cross-podcast) + rows = db.execute(""" + SELECT sl.podcast_id, sl.source_episode, sl.source_idx, + sl.target_podcast, sl.target_episode, sl.target_idx, sl.score, + p1.text as source_text, p2.text as target_text, + e1.title as source_title, e1.guest as source_guest, + e2.title as target_title, e2.guest as target_guest + FROM semantic_links sl + JOIN paragraphs p1 ON sl.podcast_id = p1.podcast_id AND sl.source_episode = p1.episode_id AND sl.source_idx = p1.idx + JOIN paragraphs p2 ON sl.target_podcast = p2.podcast_id AND sl.target_episode = p2.episode_id AND sl.target_idx = p2.idx + JOIN episodes e1 ON sl.podcast_id = e1.podcast_id AND sl.source_episode = e1.id + JOIN episodes e2 ON sl.target_podcast = e2.podcast_id AND sl.target_episode = e2.id + WHERE sl.source_episode != sl.target_episode + ORDER BY sl.score DESC + LIMIT ? + """, (LIMIT,)).fetchall() - print(f"Klassifiziere {len(rows)} Paare mit {MODEL}…") + print(f"Klassifiziere {len(rows)} Paare mit {MODEL}…") - # Check already processed - existing = set() - try: - for r in db.execute("SELECT source_podcast||source_episode||source_idx||target_podcast||target_episode||target_idx as k FROM argument_links").fetchall(): - existing.add(r["k"]) - except Exception: - pass + # Check already processed + existing = set() + try: + for r in db.execute("SELECT source_podcast||source_episode||source_idx||target_podcast||target_episode||target_idx as k FROM argument_links").fetchall(): + existing.add(r["k"]) + except Exception: + pass processed = 0 skipped = 0 + total_in_tokens = 0 + total_out_tokens = 0 for i, row in enumerate(rows): key = f"{row['podcast_id']}{row['source_episode']}{row['source_idx']}{row['target_podcast']}{row['target_episode']}{row['target_idx']}" @@ -132,6 +173,10 @@ def main(): row["target_text"][:800], meta_b ) + in_t, out_t = result.pop("_tokens", (0, 0)) + total_in_tokens += in_t + total_out_tokens += out_t + db.execute( "INSERT INTO argument_links (source_podcast, source_episode, source_idx, " "target_podcast, target_episode, target_idx, relation, confidence, explanation, score) " @@ -158,6 +203,9 @@ def main(): print("Verteilung:") for s in stats: print(f" {s['relation']}: {s['c']}") + # qwen-plus: ~$0.40/1M input, ~$1.20/1M output (DashScope intl, grobe Schaetzung) + cost = total_in_tokens / 1e6 * 0.40 + total_out_tokens / 1e6 * 1.20 + print(f"Tokens: in={total_in_tokens} out={total_out_tokens} ~${cost:.4f}") db.close() diff --git a/scripts/curate_debates.py b/scripts/curate_debates.py index 85c30e6..8cc827f 100644 --- a/scripts/curate_debates.py +++ b/scripts/curate_debates.py @@ -15,8 +15,12 @@ import sqlite3 from openai import OpenAI +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from json_utils import parse_llm_json + DB_PATH = sys.argv[1] if len(sys.argv) > 1 else "data/db.sqlite" -LIMIT = int(sys.argv[2]) if len(sys.argv) > 2 else 100 +LIMIT = int(sys.argv[2]) if len(sys.argv) > 2 and not sys.argv[2].startswith("--") else 100 +RERUN_ERRORS = "--rerun-errors" in sys.argv API_KEY = os.environ.get("DASHSCOPE_API_KEY", "") BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1" MODEL = "qwen-plus" @@ -42,24 +46,35 @@ Podcast B — {meta_b}: Erstelle die Gegenüberstellung.""" - try: - resp = client.chat.completions.create( - model=MODEL, - messages=[ - {"role": "system", "content": SYSTEM_PROMPT}, - {"role": "user", "content": user_msg}, - ], - temperature=0.2, - max_tokens=300, - ) - content = resp.choices[0].message.content.strip() - if content.startswith("```"): - content = content.split("```")[1].strip() - if content.startswith("json"): - content = content[4:].strip() - return json.loads(content) - except Exception as e: - return {"topic": "error", "agreement": "", "divergence": "", "insight": str(e)} + last_err = None + for attempt in range(3): + try: + resp = client.chat.completions.create( + model=MODEL, + messages=[ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": user_msg}, + ], + temperature=0.2, + max_tokens=400, + ) + content = resp.choices[0].message.content + usage = getattr(resp, "usage", None) + tokens = (usage.prompt_tokens, usage.completion_tokens) if usage else (0, 0) + try: + parsed = parse_llm_json(content, expect="object") + parsed["_tokens"] = tokens + return parsed + except ValueError as pe: + last_err = f"parse: {pe}" + break + except Exception as e: + last_err = str(e) + if attempt < 2: + time.sleep(2 ** attempt) + continue + break + return {"topic": "error", "agreement": "", "divergence": "", "insight": str(last_err), "_tokens": (0, 0)} def main(): @@ -68,7 +83,8 @@ def main(): sys.exit(1) client = OpenAI(api_key=API_KEY, base_url=BASE_URL) - db = sqlite3.connect(DB_PATH) + db = sqlite3.connect(DB_PATH, timeout=60.0) + db.execute("PRAGMA busy_timeout=60000") db.row_factory = sqlite3.Row db.executescript(""" @@ -82,36 +98,60 @@ def main(): CREATE INDEX IF NOT EXISTS idx_debates_topic ON debates(topic); """) - # Get strongest cross-podcast links - rows = db.execute(""" - SELECT sl.podcast_id, sl.source_episode, sl.source_idx, - sl.target_podcast, sl.target_episode, sl.target_idx, sl.score, - p1.text as source_text, p2.text as target_text, - pc1.name as source_podcast_name, pc2.name as target_podcast_name, - e1.title as source_title, e1.guest as source_guest, - e2.title as target_title, e2.guest as target_guest - FROM semantic_links sl - JOIN paragraphs p1 ON sl.podcast_id = p1.podcast_id AND sl.source_episode = p1.episode_id AND sl.source_idx = p1.idx - JOIN paragraphs p2 ON sl.target_podcast = p2.podcast_id AND sl.target_episode = p2.episode_id AND sl.target_idx = p2.idx - JOIN episodes e1 ON sl.podcast_id = e1.podcast_id AND sl.source_episode = e1.id - JOIN episodes e2 ON sl.target_podcast = e2.podcast_id AND sl.target_episode = e2.id - JOIN podcasts pc1 ON sl.podcast_id = pc1.id - JOIN podcasts pc2 ON sl.target_podcast = pc2.id - WHERE sl.podcast_id != sl.target_podcast - ORDER BY sl.score DESC - LIMIT ? - """, (LIMIT,)).fetchall() + if RERUN_ERRORS: + rows = db.execute(""" + SELECT d.source_podcast as podcast_id, d.source_episode, d.source_idx, + d.target_podcast, d.target_episode, d.target_idx, d.score, + p1.text as source_text, p2.text as target_text, + pc1.name as source_podcast_name, pc2.name as target_podcast_name, + e1.title as source_title, e1.guest as source_guest, + e2.title as target_title, e2.guest as target_guest + FROM debates d + JOIN paragraphs p1 ON d.source_podcast = p1.podcast_id AND d.source_episode = p1.episode_id AND d.source_idx = p1.idx + JOIN paragraphs p2 ON d.target_podcast = p2.podcast_id AND d.target_episode = p2.episode_id AND d.target_idx = p2.idx + JOIN episodes e1 ON d.source_podcast = e1.podcast_id AND d.source_episode = e1.id + JOIN episodes e2 ON d.target_podcast = e2.podcast_id AND d.target_episode = e2.id + JOIN podcasts pc1 ON d.source_podcast = pc1.id + JOIN podcasts pc2 ON d.target_podcast = pc2.id + WHERE d.topic = 'error' + """).fetchall() + del_count = db.execute("DELETE FROM debates WHERE topic='error'").rowcount + db.commit() + print(f"RE-RUN: {del_count} error-Records geloescht, {len(rows)} werden neu kuratiert.") + existing = set() + else: + # Get strongest cross-podcast links + rows = db.execute(""" + SELECT sl.podcast_id, sl.source_episode, sl.source_idx, + sl.target_podcast, sl.target_episode, sl.target_idx, sl.score, + p1.text as source_text, p2.text as target_text, + pc1.name as source_podcast_name, pc2.name as target_podcast_name, + e1.title as source_title, e1.guest as source_guest, + e2.title as target_title, e2.guest as target_guest + FROM semantic_links sl + JOIN paragraphs p1 ON sl.podcast_id = p1.podcast_id AND sl.source_episode = p1.episode_id AND sl.source_idx = p1.idx + JOIN paragraphs p2 ON sl.target_podcast = p2.podcast_id AND sl.target_episode = p2.episode_id AND sl.target_idx = p2.idx + JOIN episodes e1 ON sl.podcast_id = e1.podcast_id AND sl.source_episode = e1.id + JOIN episodes e2 ON sl.target_podcast = e2.podcast_id AND sl.target_episode = e2.id + JOIN podcasts pc1 ON sl.podcast_id = pc1.id + JOIN podcasts pc2 ON sl.target_podcast = pc2.id + WHERE sl.podcast_id != sl.target_podcast + ORDER BY sl.score DESC + LIMIT ? + """, (LIMIT,)).fetchall() - print(f"Kuratiere {len(rows)} Cross-Podcast-Debatten mit {MODEL}…") + print(f"Kuratiere {len(rows)} Cross-Podcast-Debatten mit {MODEL}…") - existing = set() - try: - for r in db.execute("SELECT source_podcast||source_episode||source_idx||target_podcast||target_episode||target_idx as k FROM debates").fetchall(): - existing.add(r["k"]) - except Exception: - pass + existing = set() + try: + for r in db.execute("SELECT source_podcast||source_episode||source_idx||target_podcast||target_episode||target_idx as k FROM debates").fetchall(): + existing.add(r["k"]) + except Exception: + pass processed = 0 + total_in_tokens = 0 + total_out_tokens = 0 for i, row in enumerate(rows): key = f"{row['podcast_id']}{row['source_episode']}{row['source_idx']}{row['target_podcast']}{row['target_episode']}{row['target_idx']}" if key in existing: @@ -121,6 +161,9 @@ def main(): meta_b = f"{row['target_podcast_name']} / {row['target_episode']}: {row['target_title']} ({row['target_guest']})" result = curate_pair(client, row["source_text"][:800], meta_a, row["target_text"][:800], meta_b) + in_t, out_t = result.pop("_tokens", (0, 0)) + total_in_tokens += in_t + total_out_tokens += out_t db.execute( "INSERT INTO debates (topic, source_podcast, source_episode, source_idx, " @@ -146,6 +189,8 @@ def main(): print("Top-Themen:") for t in topics: print(f" {t['topic']}: {t['c']}") + cost = total_in_tokens / 1e6 * 0.40 + total_out_tokens / 1e6 * 1.20 + print(f"Tokens: in={total_in_tokens} out={total_out_tokens} ~${cost:.4f}") db.close() diff --git a/scripts/json_utils.py b/scripts/json_utils.py new file mode 100644 index 0000000..c80b7ec --- /dev/null +++ b/scripts/json_utils.py @@ -0,0 +1,322 @@ +"""Robuster JSON-Parser fuer LLM-Antworten. + +Behebt typische Probleme: +- Markdown-Codefences (```json ... ```) +- Vorspann/Nachspann ausserhalb des JSON-Blocks +- Trailing commas +- Unescaped quotes innerhalb von Strings (heuristisch) +- Smart-Quotes +""" + +import json +import re +from typing import Any, Optional + + +def _strip_codefence(s: str) -> str: + s = s.strip() + if s.startswith("```"): + # entferne erste Zeile ```... und schliessende ``` + s = re.sub(r"^```[a-zA-Z0-9_-]*\s*\n?", "", s) + s = re.sub(r"\n?```\s*$", "", s) + return s.strip() + + +def _find_balanced(s: str, open_char: str, close_char: str) -> Optional[str]: + """Extrahiere ersten balancierten {...} oder [...]-Block, respektiert String-Literals. + + Wenn keine vollstaendige Balance erreicht wird (truncated JSON), wird der bis zum + Ende verfuegbare Block zurueckgegeben — das Repair-Pipeline-Stadium kann den dann + ggf. ergaenzen. + """ + start = s.find(open_char) + if start == -1: + return None + depth = 0 + in_str = False + esc = False + for i in range(start, len(s)): + c = s[i] + if in_str: + if esc: + esc = False + elif c == "\\": + esc = True + elif c == '"': + in_str = False + continue + if c == '"': + in_str = True + continue + if c == open_char: + depth += 1 + elif c == close_char: + depth -= 1 + if depth == 0: + return s[start:i + 1] + # Truncated: gib trotzdem den bisher gesehenen Block zurueck + return s[start:] + + +def _close_truncated(block: str, open_char: str, close_char: str) -> str: + """Schliesst einen abgeschnittenen JSON-Block heuristisch. + + Ansatz: + 1. Scanne Zeichen, tracke (in_string, esc, depth). + 2. Wenn am Ende ein String offen ist: schliesse mit ". + 3. Schneide einen evtl. unvollstaendigen Wert-Tail nach dem letzten + sicheren Komma/Open-Brace/Close-Brace. + 4. Ergaenze fehlende } / ] entsprechend depth. + """ + s = block + in_str = False + esc = False + depth = 0 + # last_safe = Position direkt nach einem komplett-abgeschlossenen Element + # (komma, open, close), das heisst: wir koennen dort ohne Datenverlust schneiden. + last_safe = 0 + for i, c in enumerate(s): + if in_str: + if esc: + esc = False + elif c == "\\": + esc = True + elif c == '"': + in_str = False + last_safe = i + 1 + continue + if c == '"': + in_str = True + continue + if c in "{[": + depth += 1 + last_safe = i + 1 + elif c in "}]": + depth -= 1 + last_safe = i + 1 + elif c == ",": + last_safe = i # vor dem Komma ist sicher + elif c == ":": + # Doppelpunkt: kein safe-cut hier + pass + elif not c.isspace(): + # Wert-Token (Zahl, true/false/null) + last_safe = i + 1 + + # Falls String am Ende offen: alle Zeichen behalten, am Ende " ergaenzen. + # Sonst: schneiden auf last_safe (entfernt unvollstaendige Werte/Keys). + if in_str: + # String einfach schliessen, lass Inhalt drin + s = s + '"' + else: + s = s[:last_safe] if last_safe > 0 else s + + # Trailing whitespace + comma entfernen + s = re.sub(r"[\s,]+$", "", s) + + # Pruefe ob letzter Token ein Key ohne Wert ist: "..." am Ende vor depth-close + # Pattern: ... "key" oder ... "key": (ohne Wert) -> entferne diesen unfertigen Eintrag + # Naive Heuristik: wenn der Inhalt mit "key" oder "key": endet ohne folgenden Wert, + # schneide bis zum letzten , oder { vor dieser Stelle. + # Recompute depth nach den Aenderungen + depth = 0 + in_str = False + esc = False + for c in s: + if in_str: + if esc: + esc = False + elif c == "\\": + esc = True + elif c == '"': + in_str = False + continue + if c == '"': + in_str = True + elif c in "{[": + depth += 1 + elif c in "}]": + depth -= 1 + + # Wenn wir mit "key" oder "key": (ohne Wert!) enden, schneide bis vorheriger ,/{. + # Wichtig: nur wenn vor diesem `"..."` ein `,` oder `{` (also Key-Position) liegt, + # nicht wenn ein `:` (Wert-Position) liegt. + tail_match = re.search(r'("[^"]*")(\s*:?)\s*$', s) + if tail_match and not s.rstrip().endswith(("}", "]")): + before = s[:tail_match.start()].rstrip() + prev_char = before[-1] if before else "" + # Nur trimmen, wenn dies ein Key ohne Wert ist (vor sich , oder {) + if prev_char in ",{": + cut = max(s.rfind(",", 0, tail_match.start()), s.rfind("{", 0, tail_match.start())) + if cut > 0: + s = s[:cut].rstrip().rstrip(",") + # depth neu berechnen + depth = 0 + in_str = False + esc = False + for c in s: + if in_str: + if esc: + esc = False + elif c == "\\": + esc = True + elif c == '"': + in_str = False + continue + if c == '"': + in_str = True + elif c in "{[": + depth += 1 + elif c in "}]": + depth -= 1 + + # Fehlende Klammern ergaenzen — kann gemischt sein, einfach von rechts pruefen + # was offen ist. + # Wir wissen: am Anfang ist open_char, depth zaehlt {[ +1 und }] -1. + # Fuer korrektes Schliessen muessen wir die Reihenfolge der offenen + # Klammern kennen. Vereinfachung: zaehle separat. + open_curly = s.count("{") - s.count("}") + open_brack = s.count("[") - s.count("]") + # Annahme: schliessende Klammern in umgekehrter Reihenfolge der oeffnenden + # Naive: suche letzte offene Klammer und schliesse damit. + while open_curly > 0 or open_brack > 0: + # finde letzte offene Klammer im String (ausserhalb von strings) + last_open = None + in_str = False + esc = False + for i, c in enumerate(s): + if in_str: + if esc: + esc = False + elif c == "\\": + esc = True + elif c == '"': + in_str = False + continue + if c == '"': + in_str = True + elif c in "{[": + last_open = (i, c) + if last_open is None: + break + # schliesse die zuletzt geoeffnete (innerste am rechten Rand) + # Aber: koennten dazwischen schon geschlossene sein. Vereinfacht: + # schliesse ab Ende. + if open_curly > 0 and (open_brack == 0 or last_open[1] == "{"): + s += "}" + open_curly -= 1 + elif open_brack > 0: + s += "]" + open_brack -= 1 + else: + break + return s + + +def _normalize_quotes(s: str) -> str: + # Ersetze typografische Anfuehrungszeichen durch ASCII (nur ausserhalb von String-Werten heikel, + # aber pragmatisch: Modelle setzen sie fast nur als Begrenzer falsch). + return (s.replace("“", '"').replace("”", '"') + .replace("„", '"').replace("‟", '"') + .replace("‘", "'").replace("’", "'")) + + +def _strip_trailing_commas(s: str) -> str: + return re.sub(r",(\s*[}\]])", r"\1", s) + + +def _escape_inner_quotes(block: str) -> str: + """Heuristik: in JSON-Strings unescaped " in escaped \" umwandeln. + + Idee: Wir scannen Token fuer Token. Wenn wir in einem String sind und ein " auftritt, + pruefen wir, ob danach ein Strukturzeichen (`,`, `}`, `]`, `:` mit moeglichem Whitespace) + folgt. Wenn nicht, ist es ein eingebettetes Anfuehrungszeichen und wird escaped. + """ + out = [] + in_str = False + esc = False + i = 0 + while i < len(block): + c = block[i] + if not in_str: + out.append(c) + if c == '"': + in_str = True + i += 1 + continue + # in_str = True + if esc: + out.append(c) + esc = False + i += 1 + continue + if c == "\\": + out.append(c) + esc = True + i += 1 + continue + if c == '"': + # Schau voraus: erlaubt nur whitespace + [,}\]:] + j = i + 1 + while j < len(block) and block[j] in " \t\r\n": + j += 1 + if j >= len(block) or block[j] in ",}]:": + # echtes Stringende + out.append(c) + in_str = False + else: + # eingebettetes Quote -> escapen + out.append("\\\"") + i += 1 + continue + out.append(c) + i += 1 + return "".join(out) + + +def parse_llm_json(content: str, expect: str = "object") -> Any: + """Parst eine LLM-Antwort robust als JSON. + + Args: + content: Rohantwort des Modells. + expect: 'object' oder 'array'. + + Returns: + geparstes Python-Objekt. + + Raises: + ValueError, wenn nichts geparst werden konnte. + """ + if content is None: + raise ValueError("leere Antwort") + s = _normalize_quotes(_strip_codefence(content)) + + open_c, close_c = ("{", "}") if expect == "object" else ("[", "]") + block = _find_balanced(s, open_c, close_c) + if block is None: + # Fallback: vielleicht steht doch das andere Format drin + alt_open, alt_close = ("[", "]") if expect == "object" else ("{", "}") + block = _find_balanced(s, alt_open, alt_close) + if block is None: + raise ValueError(f"kein {expect} gefunden in: {content[:200]}") + + closed = _close_truncated(block, open_c, close_c) + attempts = [ + block, + _strip_trailing_commas(block), + _escape_inner_quotes(block), + _strip_trailing_commas(_escape_inner_quotes(block)), + closed, + _strip_trailing_commas(closed), + _escape_inner_quotes(closed), + _strip_trailing_commas(_escape_inner_quotes(closed)), + ] + + last_err = None + for attempt in attempts: + try: + return json.loads(attempt) + except json.JSONDecodeError as e: + last_err = e + continue + raise ValueError(f"JSON-Parse fehlgeschlagen nach Repair-Versuchen: {last_err}; raw={content[:300]}") diff --git a/scripts/rerun_errors.py b/scripts/rerun_errors.py new file mode 100644 index 0000000..79ec046 --- /dev/null +++ b/scripts/rerun_errors.py @@ -0,0 +1,239 @@ +#!/usr/bin/env python3 +"""Re-Run der fehlerhaften Records in `debates` und `argument_links` mit robustem JSON-Parser. + +Vorgehen: +- Lade alle Records mit topic='error' (debates) bzw. relation='error' (argument_links) +- Hole Source-/Target-Paragraph aus DB +- Sende erneut an qwen-plus, parse mit json_utils.parse_llm_json +- UPDATE bestehenden Eintrag + +Nutzung: + DASHSCOPE_API_KEY=... python3 rerun_errors.py [db-pfad] +""" + +import json +import os +import sys +import time +import sqlite3 + +from openai import OpenAI + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from json_utils import parse_llm_json + +DB_PATH = sys.argv[1] if len(sys.argv) > 1 else "data/db.sqlite" +API_KEY = os.environ.get("DASHSCOPE_API_KEY", "") +BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1" +MODEL = "qwen-plus" + +# Kosten qwen-plus (DashScope intl, Stand 2025): $0.0008 / 1k input, $0.002 / 1k output +COST_IN = 0.0008 / 1000 +COST_OUT = 0.002 / 1000 + +DEBATES_SYSTEM = """Du bist ein Diskursanalyst. Du erhältst zwei Textabschnitte aus VERSCHIEDENEN Podcasts, die dasselbe Thema behandeln. + +Erstelle eine kurze Gegenüberstellung. Antworte NUR mit JSON: + +{ + "topic": "Das gemeinsame Thema in 3-5 Wörtern", + "agreement": "Worin stimmen beide überein? (1-2 Sätze)", + "divergence": "Worin unterscheiden sie sich? (1-2 Sätze, oder 'keine wesentliche Divergenz')", + "insight": "Was lernt man durch die Gegenüberstellung, das man aus keinem der beiden allein lernen würde? (1 Satz)" +}""" + +ARGS_SYSTEM = """Du bist ein Diskursanalyst. Du erhältst zwei Textabschnitte aus Podcast-Transkripten. +Klassifiziere die logische Relation zwischen ihnen. Antworte NUR mit einem JSON-Objekt: + +{"relation": "...", "confidence": 0.0-1.0, "explanation": "Ein Satz Begruendung"} + +Moegliche Relationen: +- "erweitert": B baut auf A auf, ergaenzt, vertieft +- "widerspricht": B widerspricht A, nennt Gegenargument +- "belegt": B liefert Evidenz/Daten fuer A's These +- "relativiert": B schraenkt A ein, nennt Ausnahmen/Bedingungen +- "gleicher_punkt": A und B sagen im Kern dasselbe +- "kein_bezug": Trotz thematischer Naehe kein logischer Bezug""" + + +class Budget: + def __init__(self, hard_limit_usd): + self.hard_limit = hard_limit_usd + self.tokens_in = 0 + self.tokens_out = 0 + + def add(self, usage): + if usage: + self.tokens_in += getattr(usage, "prompt_tokens", 0) or 0 + self.tokens_out += getattr(usage, "completion_tokens", 0) or 0 + + def cost(self): + return self.tokens_in * COST_IN + self.tokens_out * COST_OUT + + def over(self): + return self.cost() > self.hard_limit + + +def call_llm(client, system, user, max_tokens, budget): + last_err = None + for attempt in range(2): + try: + resp = client.chat.completions.create( + model=MODEL, + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": user}, + ], + temperature=0.1 if "Klassifiziere" in system else 0.2, + max_tokens=max_tokens, + ) + budget.add(getattr(resp, "usage", None)) + content = resp.choices[0].message.content + try: + return parse_llm_json(content, expect="object"), None + except ValueError as pe: + last_err = f"parse: {pe}" + # Ein Retry mit anderer Temperature waere moeglich; wir akzeptieren parse-fail + break + except Exception as e: + last_err = str(e) + if attempt < 1: + time.sleep(2) + continue + return None, last_err + + +def rerun_debates(db, client, budget): + rows = db.execute(""" + SELECT d.id as did, d.source_podcast, d.source_episode, d.source_idx, + d.target_podcast, d.target_episode, d.target_idx, + p1.text as source_text, p2.text as target_text, + pc1.name as source_pname, pc2.name as target_pname, + e1.title as source_title, e1.guest as source_guest, + e2.title as target_title, e2.guest as target_guest + FROM debates d + JOIN paragraphs p1 ON d.source_podcast = p1.podcast_id AND d.source_episode = p1.episode_id AND d.source_idx = p1.idx + JOIN paragraphs p2 ON d.target_podcast = p2.podcast_id AND d.target_episode = p2.episode_id AND d.target_idx = p2.idx + JOIN episodes e1 ON d.source_podcast = e1.podcast_id AND d.source_episode = e1.id + JOIN episodes e2 ON d.target_podcast = e2.podcast_id AND d.target_episode = e2.id + JOIN podcasts pc1 ON d.source_podcast = pc1.id + JOIN podcasts pc2 ON d.target_podcast = pc2.id + WHERE d.topic = 'error' + """).fetchall() + print(f"[debates] {len(rows)} Error-Records zu reparieren") + fixed = 0 + still_err = 0 + for i, r in enumerate(rows): + if budget.over(): + print(f"[debates] Kosten-Limit erreicht bei {i}/{len(rows)} (cost=${budget.cost():.4f})") + break + meta_a = f"{r['source_pname']} / {r['source_episode']}: {r['source_title']} ({r['source_guest']})" + meta_b = f"{r['target_pname']} / {r['target_episode']}: {r['target_title']} ({r['target_guest']})" + user = (f"Podcast A — {meta_a}:\n\"{r['source_text'][:800]}\"\n\n" + f"Podcast B — {meta_b}:\n\"{r['target_text'][:800]}\"\n\n" + "Erstelle die Gegenueberstellung.") + result, err = call_llm(client, DEBATES_SYSTEM, user, 600, budget) + if result is not None and result.get("topic"): + db.execute( + "UPDATE debates SET topic=?, agreement=?, divergence=?, insight=? WHERE id=?", + (result.get("topic", "")[:120], + result.get("agreement", "")[:1000], + result.get("divergence", "")[:1000], + result.get("insight", "")[:1000], + r["did"]), + ) + fixed += 1 + else: + still_err += 1 + db.execute( + "UPDATE debates SET insight=? WHERE id=?", + (f"rerun-failed: {err}"[:500], r["did"]), + ) + if (i + 1) % 10 == 0: + db.commit() + print(f" [debates] {i+1}/{len(rows)} gefixt={fixed} still_err={still_err} cost=${budget.cost():.4f}") + time.sleep(0.3) + db.commit() + print(f"[debates] fertig: gefixt={fixed} still_err={still_err}") + return fixed, still_err + + +def rerun_args(db, client, budget): + rows = db.execute(""" + SELECT a.id as aid, a.source_podcast, a.source_episode, a.source_idx, + a.target_podcast, a.target_episode, a.target_idx, + p1.text as source_text, p2.text as target_text, + e1.title as source_title, e1.guest as source_guest, + e2.title as target_title, e2.guest as target_guest + FROM argument_links a + JOIN paragraphs p1 ON a.source_podcast = p1.podcast_id AND a.source_episode = p1.episode_id AND a.source_idx = p1.idx + JOIN paragraphs p2 ON a.target_podcast = p2.podcast_id AND a.target_episode = p2.episode_id AND a.target_idx = p2.idx + JOIN episodes e1 ON a.source_podcast = e1.podcast_id AND a.source_episode = e1.id + JOIN episodes e2 ON a.target_podcast = e2.podcast_id AND a.target_episode = e2.id + WHERE a.relation = 'error' + """).fetchall() + print(f"[argument_links] {len(rows)} Error-Records zu reparieren") + fixed = 0 + still_err = 0 + for i, r in enumerate(rows): + if budget.over(): + print(f"[args] Kosten-Limit erreicht bei {i}/{len(rows)} (cost=${budget.cost():.4f})") + break + meta_a = f"{r['source_episode']}: {r['source_title']} — {r['source_guest']}" + meta_b = f"{r['target_episode']}: {r['target_title']} — {r['target_guest']}" + user = (f"Absatz A ({meta_a}):\n\"{r['source_text'][:800]}\"\n\n" + f"Absatz B ({meta_b}):\n\"{r['target_text'][:800]}\"\n\n" + "Welche logische Relation besteht von A zu B?") + result, err = call_llm(client, ARGS_SYSTEM, user, 350, budget) + if result is not None and result.get("relation") and result.get("relation") != "error": + db.execute( + "UPDATE argument_links SET relation=?, confidence=?, explanation=? WHERE id=?", + (str(result.get("relation", ""))[:60], + float(result.get("confidence", 0) or 0), + str(result.get("explanation", ""))[:1000], + r["aid"]), + ) + fixed += 1 + else: + still_err += 1 + db.execute( + "UPDATE argument_links SET explanation=? WHERE id=?", + (f"rerun-failed: {err}"[:500], r["aid"]), + ) + if (i + 1) % 20 == 0: + db.commit() + print(f" [args] {i+1}/{len(rows)} gefixt={fixed} still_err={still_err} cost=${budget.cost():.4f}") + time.sleep(0.25) + db.commit() + print(f"[argument_links] fertig: gefixt={fixed} still_err={still_err}") + return fixed, still_err + + +def main(): + if not API_KEY: + print("DASHSCOPE_API_KEY nicht gesetzt.") + sys.exit(1) + client = OpenAI(api_key=API_KEY, base_url=BASE_URL, timeout=30.0, max_retries=1) + db = sqlite3.connect(DB_PATH, timeout=30) + db.row_factory = sqlite3.Row + db.execute("PRAGMA busy_timeout=30000") + + # Aufgabe-A-Budget: 1 USD + budget = Budget(hard_limit_usd=1.0) + + print(f"DB: {DB_PATH}, Modell: {MODEL}") + d_fixed, d_err = rerun_debates(db, client, budget) + a_fixed, a_err = rerun_args(db, client, budget) + + print() + print("=== Zusammenfassung Aufgabe A ===") + print(f" debates gefixt={d_fixed} still_err={d_err}") + print(f" argument_links gefixt={a_fixed} still_err={a_err}") + print(f" Tokens in={budget.tokens_in} out={budget.tokens_out}") + print(f" Kosten ~${budget.cost():.4f}") + + db.close() + + +if __name__ == "__main__": + main()