- scripts/json_utils.py: parse_llm_json() mit Codefence-Strip, balanced-brace-Extractor, Truncation-Repair, Inner-Quote-Escape und Trailing-Comma-Strip. - scripts/analyse_arguments.py + scripts/curate_debates.py: nutzen den neuen Parser, drei Retries bei Netz/Rate-Limit, --rerun-errors-Pfad fuer das Reparieren bestehender error-Records, busy_timeout=60s gegen SQLite-Locks. - scripts/rerun_errors.py: Standalone-Re-Runner fuer beide Tabellen (debates.topic='error' und argument_links.relation='error') mit Budget-Limit, behaelt IDs via UPDATE. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
323 lines
10 KiB
Python
323 lines
10 KiB
Python
"""Robuster JSON-Parser fuer LLM-Antworten.
|
||
|
||
Behebt typische Probleme:
|
||
- Markdown-Codefences (```json ... ```)
|
||
- Vorspann/Nachspann ausserhalb des JSON-Blocks
|
||
- Trailing commas
|
||
- Unescaped quotes innerhalb von Strings (heuristisch)
|
||
- Smart-Quotes
|
||
"""
|
||
|
||
import json
|
||
import re
|
||
from typing import Any, Optional
|
||
|
||
|
||
def _strip_codefence(s: str) -> str:
|
||
s = s.strip()
|
||
if s.startswith("```"):
|
||
# entferne erste Zeile ```... und schliessende ```
|
||
s = re.sub(r"^```[a-zA-Z0-9_-]*\s*\n?", "", s)
|
||
s = re.sub(r"\n?```\s*$", "", s)
|
||
return s.strip()
|
||
|
||
|
||
def _find_balanced(s: str, open_char: str, close_char: str) -> Optional[str]:
|
||
"""Extrahiere ersten balancierten {...} oder [...]-Block, respektiert String-Literals.
|
||
|
||
Wenn keine vollstaendige Balance erreicht wird (truncated JSON), wird der bis zum
|
||
Ende verfuegbare Block zurueckgegeben — das Repair-Pipeline-Stadium kann den dann
|
||
ggf. ergaenzen.
|
||
"""
|
||
start = s.find(open_char)
|
||
if start == -1:
|
||
return None
|
||
depth = 0
|
||
in_str = False
|
||
esc = False
|
||
for i in range(start, len(s)):
|
||
c = s[i]
|
||
if in_str:
|
||
if esc:
|
||
esc = False
|
||
elif c == "\\":
|
||
esc = True
|
||
elif c == '"':
|
||
in_str = False
|
||
continue
|
||
if c == '"':
|
||
in_str = True
|
||
continue
|
||
if c == open_char:
|
||
depth += 1
|
||
elif c == close_char:
|
||
depth -= 1
|
||
if depth == 0:
|
||
return s[start:i + 1]
|
||
# Truncated: gib trotzdem den bisher gesehenen Block zurueck
|
||
return s[start:]
|
||
|
||
|
||
def _close_truncated(block: str, open_char: str, close_char: str) -> str:
|
||
"""Schliesst einen abgeschnittenen JSON-Block heuristisch.
|
||
|
||
Ansatz:
|
||
1. Scanne Zeichen, tracke (in_string, esc, depth).
|
||
2. Wenn am Ende ein String offen ist: schliesse mit ".
|
||
3. Schneide einen evtl. unvollstaendigen Wert-Tail nach dem letzten
|
||
sicheren Komma/Open-Brace/Close-Brace.
|
||
4. Ergaenze fehlende } / ] entsprechend depth.
|
||
"""
|
||
s = block
|
||
in_str = False
|
||
esc = False
|
||
depth = 0
|
||
# last_safe = Position direkt nach einem komplett-abgeschlossenen Element
|
||
# (komma, open, close), das heisst: wir koennen dort ohne Datenverlust schneiden.
|
||
last_safe = 0
|
||
for i, c in enumerate(s):
|
||
if in_str:
|
||
if esc:
|
||
esc = False
|
||
elif c == "\\":
|
||
esc = True
|
||
elif c == '"':
|
||
in_str = False
|
||
last_safe = i + 1
|
||
continue
|
||
if c == '"':
|
||
in_str = True
|
||
continue
|
||
if c in "{[":
|
||
depth += 1
|
||
last_safe = i + 1
|
||
elif c in "}]":
|
||
depth -= 1
|
||
last_safe = i + 1
|
||
elif c == ",":
|
||
last_safe = i # vor dem Komma ist sicher
|
||
elif c == ":":
|
||
# Doppelpunkt: kein safe-cut hier
|
||
pass
|
||
elif not c.isspace():
|
||
# Wert-Token (Zahl, true/false/null)
|
||
last_safe = i + 1
|
||
|
||
# Falls String am Ende offen: alle Zeichen behalten, am Ende " ergaenzen.
|
||
# Sonst: schneiden auf last_safe (entfernt unvollstaendige Werte/Keys).
|
||
if in_str:
|
||
# String einfach schliessen, lass Inhalt drin
|
||
s = s + '"'
|
||
else:
|
||
s = s[:last_safe] if last_safe > 0 else s
|
||
|
||
# Trailing whitespace + comma entfernen
|
||
s = re.sub(r"[\s,]+$", "", s)
|
||
|
||
# Pruefe ob letzter Token ein Key ohne Wert ist: "..." am Ende vor depth-close
|
||
# Pattern: ... "key" oder ... "key": (ohne Wert) -> entferne diesen unfertigen Eintrag
|
||
# Naive Heuristik: wenn der Inhalt mit "key" oder "key": endet ohne folgenden Wert,
|
||
# schneide bis zum letzten , oder { vor dieser Stelle.
|
||
# Recompute depth nach den Aenderungen
|
||
depth = 0
|
||
in_str = False
|
||
esc = False
|
||
for c in s:
|
||
if in_str:
|
||
if esc:
|
||
esc = False
|
||
elif c == "\\":
|
||
esc = True
|
||
elif c == '"':
|
||
in_str = False
|
||
continue
|
||
if c == '"':
|
||
in_str = True
|
||
elif c in "{[":
|
||
depth += 1
|
||
elif c in "}]":
|
||
depth -= 1
|
||
|
||
# Wenn wir mit "key" oder "key": (ohne Wert!) enden, schneide bis vorheriger ,/{.
|
||
# Wichtig: nur wenn vor diesem `"..."` ein `,` oder `{` (also Key-Position) liegt,
|
||
# nicht wenn ein `:` (Wert-Position) liegt.
|
||
tail_match = re.search(r'("[^"]*")(\s*:?)\s*$', s)
|
||
if tail_match and not s.rstrip().endswith(("}", "]")):
|
||
before = s[:tail_match.start()].rstrip()
|
||
prev_char = before[-1] if before else ""
|
||
# Nur trimmen, wenn dies ein Key ohne Wert ist (vor sich , oder {)
|
||
if prev_char in ",{":
|
||
cut = max(s.rfind(",", 0, tail_match.start()), s.rfind("{", 0, tail_match.start()))
|
||
if cut > 0:
|
||
s = s[:cut].rstrip().rstrip(",")
|
||
# depth neu berechnen
|
||
depth = 0
|
||
in_str = False
|
||
esc = False
|
||
for c in s:
|
||
if in_str:
|
||
if esc:
|
||
esc = False
|
||
elif c == "\\":
|
||
esc = True
|
||
elif c == '"':
|
||
in_str = False
|
||
continue
|
||
if c == '"':
|
||
in_str = True
|
||
elif c in "{[":
|
||
depth += 1
|
||
elif c in "}]":
|
||
depth -= 1
|
||
|
||
# Fehlende Klammern ergaenzen — kann gemischt sein, einfach von rechts pruefen
|
||
# was offen ist.
|
||
# Wir wissen: am Anfang ist open_char, depth zaehlt {[ +1 und }] -1.
|
||
# Fuer korrektes Schliessen muessen wir die Reihenfolge der offenen
|
||
# Klammern kennen. Vereinfachung: zaehle separat.
|
||
open_curly = s.count("{") - s.count("}")
|
||
open_brack = s.count("[") - s.count("]")
|
||
# Annahme: schliessende Klammern in umgekehrter Reihenfolge der oeffnenden
|
||
# Naive: suche letzte offene Klammer und schliesse damit.
|
||
while open_curly > 0 or open_brack > 0:
|
||
# finde letzte offene Klammer im String (ausserhalb von strings)
|
||
last_open = None
|
||
in_str = False
|
||
esc = False
|
||
for i, c in enumerate(s):
|
||
if in_str:
|
||
if esc:
|
||
esc = False
|
||
elif c == "\\":
|
||
esc = True
|
||
elif c == '"':
|
||
in_str = False
|
||
continue
|
||
if c == '"':
|
||
in_str = True
|
||
elif c in "{[":
|
||
last_open = (i, c)
|
||
if last_open is None:
|
||
break
|
||
# schliesse die zuletzt geoeffnete (innerste am rechten Rand)
|
||
# Aber: koennten dazwischen schon geschlossene sein. Vereinfacht:
|
||
# schliesse ab Ende.
|
||
if open_curly > 0 and (open_brack == 0 or last_open[1] == "{"):
|
||
s += "}"
|
||
open_curly -= 1
|
||
elif open_brack > 0:
|
||
s += "]"
|
||
open_brack -= 1
|
||
else:
|
||
break
|
||
return s
|
||
|
||
|
||
def _normalize_quotes(s: str) -> str:
|
||
# Ersetze typografische Anfuehrungszeichen durch ASCII (nur ausserhalb von String-Werten heikel,
|
||
# aber pragmatisch: Modelle setzen sie fast nur als Begrenzer falsch).
|
||
return (s.replace("“", '"').replace("”", '"')
|
||
.replace("„", '"').replace("‟", '"')
|
||
.replace("‘", "'").replace("’", "'"))
|
||
|
||
|
||
def _strip_trailing_commas(s: str) -> str:
|
||
return re.sub(r",(\s*[}\]])", r"\1", s)
|
||
|
||
|
||
def _escape_inner_quotes(block: str) -> str:
|
||
"""Heuristik: in JSON-Strings unescaped " in escaped \" umwandeln.
|
||
|
||
Idee: Wir scannen Token fuer Token. Wenn wir in einem String sind und ein " auftritt,
|
||
pruefen wir, ob danach ein Strukturzeichen (`,`, `}`, `]`, `:` mit moeglichem Whitespace)
|
||
folgt. Wenn nicht, ist es ein eingebettetes Anfuehrungszeichen und wird escaped.
|
||
"""
|
||
out = []
|
||
in_str = False
|
||
esc = False
|
||
i = 0
|
||
while i < len(block):
|
||
c = block[i]
|
||
if not in_str:
|
||
out.append(c)
|
||
if c == '"':
|
||
in_str = True
|
||
i += 1
|
||
continue
|
||
# in_str = True
|
||
if esc:
|
||
out.append(c)
|
||
esc = False
|
||
i += 1
|
||
continue
|
||
if c == "\\":
|
||
out.append(c)
|
||
esc = True
|
||
i += 1
|
||
continue
|
||
if c == '"':
|
||
# Schau voraus: erlaubt nur whitespace + [,}\]:]
|
||
j = i + 1
|
||
while j < len(block) and block[j] in " \t\r\n":
|
||
j += 1
|
||
if j >= len(block) or block[j] in ",}]:":
|
||
# echtes Stringende
|
||
out.append(c)
|
||
in_str = False
|
||
else:
|
||
# eingebettetes Quote -> escapen
|
||
out.append("\\\"")
|
||
i += 1
|
||
continue
|
||
out.append(c)
|
||
i += 1
|
||
return "".join(out)
|
||
|
||
|
||
def parse_llm_json(content: str, expect: str = "object") -> Any:
|
||
"""Parst eine LLM-Antwort robust als JSON.
|
||
|
||
Args:
|
||
content: Rohantwort des Modells.
|
||
expect: 'object' oder 'array'.
|
||
|
||
Returns:
|
||
geparstes Python-Objekt.
|
||
|
||
Raises:
|
||
ValueError, wenn nichts geparst werden konnte.
|
||
"""
|
||
if content is None:
|
||
raise ValueError("leere Antwort")
|
||
s = _normalize_quotes(_strip_codefence(content))
|
||
|
||
open_c, close_c = ("{", "}") if expect == "object" else ("[", "]")
|
||
block = _find_balanced(s, open_c, close_c)
|
||
if block is None:
|
||
# Fallback: vielleicht steht doch das andere Format drin
|
||
alt_open, alt_close = ("[", "]") if expect == "object" else ("{", "}")
|
||
block = _find_balanced(s, alt_open, alt_close)
|
||
if block is None:
|
||
raise ValueError(f"kein {expect} gefunden in: {content[:200]}")
|
||
|
||
closed = _close_truncated(block, open_c, close_c)
|
||
attempts = [
|
||
block,
|
||
_strip_trailing_commas(block),
|
||
_escape_inner_quotes(block),
|
||
_strip_trailing_commas(_escape_inner_quotes(block)),
|
||
closed,
|
||
_strip_trailing_commas(closed),
|
||
_escape_inner_quotes(closed),
|
||
_strip_trailing_commas(_escape_inner_quotes(closed)),
|
||
]
|
||
|
||
last_err = None
|
||
for attempt in attempts:
|
||
try:
|
||
return json.loads(attempt)
|
||
except json.JSONDecodeError as e:
|
||
last_err = e
|
||
continue
|
||
raise ValueError(f"JSON-Parse fehlgeschlagen nach Repair-Versuchen: {last_err}; raw={content[:300]}")
|