podcast-mindmap/scripts/json_utils.py

323 lines
10 KiB
Python
Raw Normal View History

"""Robuster JSON-Parser fuer LLM-Antworten.
Behebt typische Probleme:
- Markdown-Codefences (```json ... ```)
- Vorspann/Nachspann ausserhalb des JSON-Blocks
- Trailing commas
- Unescaped quotes innerhalb von Strings (heuristisch)
- Smart-Quotes
"""
import json
import re
from typing import Any, Optional
def _strip_codefence(s: str) -> str:
s = s.strip()
if s.startswith("```"):
# entferne erste Zeile ```... und schliessende ```
s = re.sub(r"^```[a-zA-Z0-9_-]*\s*\n?", "", s)
s = re.sub(r"\n?```\s*$", "", s)
return s.strip()
def _find_balanced(s: str, open_char: str, close_char: str) -> Optional[str]:
"""Extrahiere ersten balancierten {...} oder [...]-Block, respektiert String-Literals.
Wenn keine vollstaendige Balance erreicht wird (truncated JSON), wird der bis zum
Ende verfuegbare Block zurueckgegeben das Repair-Pipeline-Stadium kann den dann
ggf. ergaenzen.
"""
start = s.find(open_char)
if start == -1:
return None
depth = 0
in_str = False
esc = False
for i in range(start, len(s)):
c = s[i]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
continue
if c == open_char:
depth += 1
elif c == close_char:
depth -= 1
if depth == 0:
return s[start:i + 1]
# Truncated: gib trotzdem den bisher gesehenen Block zurueck
return s[start:]
def _close_truncated(block: str, open_char: str, close_char: str) -> str:
"""Schliesst einen abgeschnittenen JSON-Block heuristisch.
Ansatz:
1. Scanne Zeichen, tracke (in_string, esc, depth).
2. Wenn am Ende ein String offen ist: schliesse mit ".
3. Schneide einen evtl. unvollstaendigen Wert-Tail nach dem letzten
sicheren Komma/Open-Brace/Close-Brace.
4. Ergaenze fehlende } / ] entsprechend depth.
"""
s = block
in_str = False
esc = False
depth = 0
# last_safe = Position direkt nach einem komplett-abgeschlossenen Element
# (komma, open, close), das heisst: wir koennen dort ohne Datenverlust schneiden.
last_safe = 0
for i, c in enumerate(s):
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
last_safe = i + 1
continue
if c == '"':
in_str = True
continue
if c in "{[":
depth += 1
last_safe = i + 1
elif c in "}]":
depth -= 1
last_safe = i + 1
elif c == ",":
last_safe = i # vor dem Komma ist sicher
elif c == ":":
# Doppelpunkt: kein safe-cut hier
pass
elif not c.isspace():
# Wert-Token (Zahl, true/false/null)
last_safe = i + 1
# Falls String am Ende offen: alle Zeichen behalten, am Ende " ergaenzen.
# Sonst: schneiden auf last_safe (entfernt unvollstaendige Werte/Keys).
if in_str:
# String einfach schliessen, lass Inhalt drin
s = s + '"'
else:
s = s[:last_safe] if last_safe > 0 else s
# Trailing whitespace + comma entfernen
s = re.sub(r"[\s,]+$", "", s)
# Pruefe ob letzter Token ein Key ohne Wert ist: "..." am Ende vor depth-close
# Pattern: ... "key" oder ... "key": (ohne Wert) -> entferne diesen unfertigen Eintrag
# Naive Heuristik: wenn der Inhalt mit "key" oder "key": endet ohne folgenden Wert,
# schneide bis zum letzten , oder { vor dieser Stelle.
# Recompute depth nach den Aenderungen
depth = 0
in_str = False
esc = False
for c in s:
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c in "{[":
depth += 1
elif c in "}]":
depth -= 1
# Wenn wir mit "key" oder "key": (ohne Wert!) enden, schneide bis vorheriger ,/{.
# Wichtig: nur wenn vor diesem `"..."` ein `,` oder `{` (also Key-Position) liegt,
# nicht wenn ein `:` (Wert-Position) liegt.
tail_match = re.search(r'("[^"]*")(\s*:?)\s*$', s)
if tail_match and not s.rstrip().endswith(("}", "]")):
before = s[:tail_match.start()].rstrip()
prev_char = before[-1] if before else ""
# Nur trimmen, wenn dies ein Key ohne Wert ist (vor sich , oder {)
if prev_char in ",{":
cut = max(s.rfind(",", 0, tail_match.start()), s.rfind("{", 0, tail_match.start()))
if cut > 0:
s = s[:cut].rstrip().rstrip(",")
# depth neu berechnen
depth = 0
in_str = False
esc = False
for c in s:
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c in "{[":
depth += 1
elif c in "}]":
depth -= 1
# Fehlende Klammern ergaenzen — kann gemischt sein, einfach von rechts pruefen
# was offen ist.
# Wir wissen: am Anfang ist open_char, depth zaehlt {[ +1 und }] -1.
# Fuer korrektes Schliessen muessen wir die Reihenfolge der offenen
# Klammern kennen. Vereinfachung: zaehle separat.
open_curly = s.count("{") - s.count("}")
open_brack = s.count("[") - s.count("]")
# Annahme: schliessende Klammern in umgekehrter Reihenfolge der oeffnenden
# Naive: suche letzte offene Klammer und schliesse damit.
while open_curly > 0 or open_brack > 0:
# finde letzte offene Klammer im String (ausserhalb von strings)
last_open = None
in_str = False
esc = False
for i, c in enumerate(s):
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c in "{[":
last_open = (i, c)
if last_open is None:
break
# schliesse die zuletzt geoeffnete (innerste am rechten Rand)
# Aber: koennten dazwischen schon geschlossene sein. Vereinfacht:
# schliesse ab Ende.
if open_curly > 0 and (open_brack == 0 or last_open[1] == "{"):
s += "}"
open_curly -= 1
elif open_brack > 0:
s += "]"
open_brack -= 1
else:
break
return s
def _normalize_quotes(s: str) -> str:
# Ersetze typografische Anfuehrungszeichen durch ASCII (nur ausserhalb von String-Werten heikel,
# aber pragmatisch: Modelle setzen sie fast nur als Begrenzer falsch).
return (s.replace("", '"').replace("", '"')
.replace("", '"').replace("", '"')
.replace("", "'").replace("", "'"))
def _strip_trailing_commas(s: str) -> str:
return re.sub(r",(\s*[}\]])", r"\1", s)
def _escape_inner_quotes(block: str) -> str:
"""Heuristik: in JSON-Strings unescaped " in escaped \" umwandeln.
Idee: Wir scannen Token fuer Token. Wenn wir in einem String sind und ein " auftritt,
pruefen wir, ob danach ein Strukturzeichen (`,`, `}`, `]`, `:` mit moeglichem Whitespace)
folgt. Wenn nicht, ist es ein eingebettetes Anfuehrungszeichen und wird escaped.
"""
out = []
in_str = False
esc = False
i = 0
while i < len(block):
c = block[i]
if not in_str:
out.append(c)
if c == '"':
in_str = True
i += 1
continue
# in_str = True
if esc:
out.append(c)
esc = False
i += 1
continue
if c == "\\":
out.append(c)
esc = True
i += 1
continue
if c == '"':
# Schau voraus: erlaubt nur whitespace + [,}\]:]
j = i + 1
while j < len(block) and block[j] in " \t\r\n":
j += 1
if j >= len(block) or block[j] in ",}]:":
# echtes Stringende
out.append(c)
in_str = False
else:
# eingebettetes Quote -> escapen
out.append("\\\"")
i += 1
continue
out.append(c)
i += 1
return "".join(out)
def parse_llm_json(content: str, expect: str = "object") -> Any:
"""Parst eine LLM-Antwort robust als JSON.
Args:
content: Rohantwort des Modells.
expect: 'object' oder 'array'.
Returns:
geparstes Python-Objekt.
Raises:
ValueError, wenn nichts geparst werden konnte.
"""
if content is None:
raise ValueError("leere Antwort")
s = _normalize_quotes(_strip_codefence(content))
open_c, close_c = ("{", "}") if expect == "object" else ("[", "]")
block = _find_balanced(s, open_c, close_c)
if block is None:
# Fallback: vielleicht steht doch das andere Format drin
alt_open, alt_close = ("[", "]") if expect == "object" else ("{", "}")
block = _find_balanced(s, alt_open, alt_close)
if block is None:
raise ValueError(f"kein {expect} gefunden in: {content[:200]}")
closed = _close_truncated(block, open_c, close_c)
attempts = [
block,
_strip_trailing_commas(block),
_escape_inner_quotes(block),
_strip_trailing_commas(_escape_inner_quotes(block)),
closed,
_strip_trailing_commas(closed),
_escape_inner_quotes(closed),
_strip_trailing_commas(_escape_inner_quotes(closed)),
]
last_err = None
for attempt in attempts:
try:
return json.loads(attempt)
except json.JSONDecodeError as e:
last_err = e
continue
raise ValueError(f"JSON-Parse fehlgeschlagen nach Repair-Versuchen: {last_err}; raw={content[:300]}")