podcast-mindmap/scripts/json_utils.py
Dotty Dotter 839ae2c27e #13/#18 Robuster JSON-Parser + --rerun-errors-Modus
- scripts/json_utils.py: parse_llm_json() mit Codefence-Strip, balanced-brace-Extractor, Truncation-Repair, Inner-Quote-Escape und Trailing-Comma-Strip.
- scripts/analyse_arguments.py + scripts/curate_debates.py: nutzen den neuen Parser, drei Retries bei Netz/Rate-Limit, --rerun-errors-Pfad fuer das Reparieren bestehender error-Records, busy_timeout=60s gegen SQLite-Locks.
- scripts/rerun_errors.py: Standalone-Re-Runner fuer beide Tabellen (debates.topic='error' und argument_links.relation='error') mit Budget-Limit, behaelt IDs via UPDATE.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 00:30:45 +02:00

323 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Robuster JSON-Parser fuer LLM-Antworten.
Behebt typische Probleme:
- Markdown-Codefences (```json ... ```)
- Vorspann/Nachspann ausserhalb des JSON-Blocks
- Trailing commas
- Unescaped quotes innerhalb von Strings (heuristisch)
- Smart-Quotes
"""
import json
import re
from typing import Any, Optional
def _strip_codefence(s: str) -> str:
s = s.strip()
if s.startswith("```"):
# entferne erste Zeile ```... und schliessende ```
s = re.sub(r"^```[a-zA-Z0-9_-]*\s*\n?", "", s)
s = re.sub(r"\n?```\s*$", "", s)
return s.strip()
def _find_balanced(s: str, open_char: str, close_char: str) -> Optional[str]:
"""Extrahiere ersten balancierten {...} oder [...]-Block, respektiert String-Literals.
Wenn keine vollstaendige Balance erreicht wird (truncated JSON), wird der bis zum
Ende verfuegbare Block zurueckgegeben — das Repair-Pipeline-Stadium kann den dann
ggf. ergaenzen.
"""
start = s.find(open_char)
if start == -1:
return None
depth = 0
in_str = False
esc = False
for i in range(start, len(s)):
c = s[i]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
continue
if c == open_char:
depth += 1
elif c == close_char:
depth -= 1
if depth == 0:
return s[start:i + 1]
# Truncated: gib trotzdem den bisher gesehenen Block zurueck
return s[start:]
def _close_truncated(block: str, open_char: str, close_char: str) -> str:
"""Schliesst einen abgeschnittenen JSON-Block heuristisch.
Ansatz:
1. Scanne Zeichen, tracke (in_string, esc, depth).
2. Wenn am Ende ein String offen ist: schliesse mit ".
3. Schneide einen evtl. unvollstaendigen Wert-Tail nach dem letzten
sicheren Komma/Open-Brace/Close-Brace.
4. Ergaenze fehlende } / ] entsprechend depth.
"""
s = block
in_str = False
esc = False
depth = 0
# last_safe = Position direkt nach einem komplett-abgeschlossenen Element
# (komma, open, close), das heisst: wir koennen dort ohne Datenverlust schneiden.
last_safe = 0
for i, c in enumerate(s):
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
last_safe = i + 1
continue
if c == '"':
in_str = True
continue
if c in "{[":
depth += 1
last_safe = i + 1
elif c in "}]":
depth -= 1
last_safe = i + 1
elif c == ",":
last_safe = i # vor dem Komma ist sicher
elif c == ":":
# Doppelpunkt: kein safe-cut hier
pass
elif not c.isspace():
# Wert-Token (Zahl, true/false/null)
last_safe = i + 1
# Falls String am Ende offen: alle Zeichen behalten, am Ende " ergaenzen.
# Sonst: schneiden auf last_safe (entfernt unvollstaendige Werte/Keys).
if in_str:
# String einfach schliessen, lass Inhalt drin
s = s + '"'
else:
s = s[:last_safe] if last_safe > 0 else s
# Trailing whitespace + comma entfernen
s = re.sub(r"[\s,]+$", "", s)
# Pruefe ob letzter Token ein Key ohne Wert ist: "..." am Ende vor depth-close
# Pattern: ... "key" oder ... "key": (ohne Wert) -> entferne diesen unfertigen Eintrag
# Naive Heuristik: wenn der Inhalt mit "key" oder "key": endet ohne folgenden Wert,
# schneide bis zum letzten , oder { vor dieser Stelle.
# Recompute depth nach den Aenderungen
depth = 0
in_str = False
esc = False
for c in s:
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c in "{[":
depth += 1
elif c in "}]":
depth -= 1
# Wenn wir mit "key" oder "key": (ohne Wert!) enden, schneide bis vorheriger ,/{.
# Wichtig: nur wenn vor diesem `"..."` ein `,` oder `{` (also Key-Position) liegt,
# nicht wenn ein `:` (Wert-Position) liegt.
tail_match = re.search(r'("[^"]*")(\s*:?)\s*$', s)
if tail_match and not s.rstrip().endswith(("}", "]")):
before = s[:tail_match.start()].rstrip()
prev_char = before[-1] if before else ""
# Nur trimmen, wenn dies ein Key ohne Wert ist (vor sich , oder {)
if prev_char in ",{":
cut = max(s.rfind(",", 0, tail_match.start()), s.rfind("{", 0, tail_match.start()))
if cut > 0:
s = s[:cut].rstrip().rstrip(",")
# depth neu berechnen
depth = 0
in_str = False
esc = False
for c in s:
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c in "{[":
depth += 1
elif c in "}]":
depth -= 1
# Fehlende Klammern ergaenzen — kann gemischt sein, einfach von rechts pruefen
# was offen ist.
# Wir wissen: am Anfang ist open_char, depth zaehlt {[ +1 und }] -1.
# Fuer korrektes Schliessen muessen wir die Reihenfolge der offenen
# Klammern kennen. Vereinfachung: zaehle separat.
open_curly = s.count("{") - s.count("}")
open_brack = s.count("[") - s.count("]")
# Annahme: schliessende Klammern in umgekehrter Reihenfolge der oeffnenden
# Naive: suche letzte offene Klammer und schliesse damit.
while open_curly > 0 or open_brack > 0:
# finde letzte offene Klammer im String (ausserhalb von strings)
last_open = None
in_str = False
esc = False
for i, c in enumerate(s):
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c in "{[":
last_open = (i, c)
if last_open is None:
break
# schliesse die zuletzt geoeffnete (innerste am rechten Rand)
# Aber: koennten dazwischen schon geschlossene sein. Vereinfacht:
# schliesse ab Ende.
if open_curly > 0 and (open_brack == 0 or last_open[1] == "{"):
s += "}"
open_curly -= 1
elif open_brack > 0:
s += "]"
open_brack -= 1
else:
break
return s
def _normalize_quotes(s: str) -> str:
# Ersetze typografische Anfuehrungszeichen durch ASCII (nur ausserhalb von String-Werten heikel,
# aber pragmatisch: Modelle setzen sie fast nur als Begrenzer falsch).
return (s.replace("", '"').replace("", '"')
.replace("", '"').replace("", '"')
.replace("", "'").replace("", "'"))
def _strip_trailing_commas(s: str) -> str:
return re.sub(r",(\s*[}\]])", r"\1", s)
def _escape_inner_quotes(block: str) -> str:
"""Heuristik: in JSON-Strings unescaped " in escaped \" umwandeln.
Idee: Wir scannen Token fuer Token. Wenn wir in einem String sind und ein " auftritt,
pruefen wir, ob danach ein Strukturzeichen (`,`, `}`, `]`, `:` mit moeglichem Whitespace)
folgt. Wenn nicht, ist es ein eingebettetes Anfuehrungszeichen und wird escaped.
"""
out = []
in_str = False
esc = False
i = 0
while i < len(block):
c = block[i]
if not in_str:
out.append(c)
if c == '"':
in_str = True
i += 1
continue
# in_str = True
if esc:
out.append(c)
esc = False
i += 1
continue
if c == "\\":
out.append(c)
esc = True
i += 1
continue
if c == '"':
# Schau voraus: erlaubt nur whitespace + [,}\]:]
j = i + 1
while j < len(block) and block[j] in " \t\r\n":
j += 1
if j >= len(block) or block[j] in ",}]:":
# echtes Stringende
out.append(c)
in_str = False
else:
# eingebettetes Quote -> escapen
out.append("\\\"")
i += 1
continue
out.append(c)
i += 1
return "".join(out)
def parse_llm_json(content: str, expect: str = "object") -> Any:
"""Parst eine LLM-Antwort robust als JSON.
Args:
content: Rohantwort des Modells.
expect: 'object' oder 'array'.
Returns:
geparstes Python-Objekt.
Raises:
ValueError, wenn nichts geparst werden konnte.
"""
if content is None:
raise ValueError("leere Antwort")
s = _normalize_quotes(_strip_codefence(content))
open_c, close_c = ("{", "}") if expect == "object" else ("[", "]")
block = _find_balanced(s, open_c, close_c)
if block is None:
# Fallback: vielleicht steht doch das andere Format drin
alt_open, alt_close = ("[", "]") if expect == "object" else ("{", "}")
block = _find_balanced(s, alt_open, alt_close)
if block is None:
raise ValueError(f"kein {expect} gefunden in: {content[:200]}")
closed = _close_truncated(block, open_c, close_c)
attempts = [
block,
_strip_trailing_commas(block),
_escape_inner_quotes(block),
_strip_trailing_commas(_escape_inner_quotes(block)),
closed,
_strip_trailing_commas(closed),
_escape_inner_quotes(closed),
_strip_trailing_commas(_escape_inner_quotes(closed)),
]
last_err = None
for attempt in attempts:
try:
return json.loads(attempt)
except json.JSONDecodeError as e:
last_err = e
continue
raise ValueError(f"JSON-Parse fehlgeschlagen nach Repair-Versuchen: {last_err}; raw={content[:300]}")