323 lines
10 KiB
Python
323 lines
10 KiB
Python
|
|
"""Robuster JSON-Parser fuer LLM-Antworten.
|
|||
|
|
|
|||
|
|
Behebt typische Probleme:
|
|||
|
|
- Markdown-Codefences (```json ... ```)
|
|||
|
|
- Vorspann/Nachspann ausserhalb des JSON-Blocks
|
|||
|
|
- Trailing commas
|
|||
|
|
- Unescaped quotes innerhalb von Strings (heuristisch)
|
|||
|
|
- Smart-Quotes
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import json
|
|||
|
|
import re
|
|||
|
|
from typing import Any, Optional
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _strip_codefence(s: str) -> str:
|
|||
|
|
s = s.strip()
|
|||
|
|
if s.startswith("```"):
|
|||
|
|
# entferne erste Zeile ```... und schliessende ```
|
|||
|
|
s = re.sub(r"^```[a-zA-Z0-9_-]*\s*\n?", "", s)
|
|||
|
|
s = re.sub(r"\n?```\s*$", "", s)
|
|||
|
|
return s.strip()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _find_balanced(s: str, open_char: str, close_char: str) -> Optional[str]:
|
|||
|
|
"""Extrahiere ersten balancierten {...} oder [...]-Block, respektiert String-Literals.
|
|||
|
|
|
|||
|
|
Wenn keine vollstaendige Balance erreicht wird (truncated JSON), wird der bis zum
|
|||
|
|
Ende verfuegbare Block zurueckgegeben — das Repair-Pipeline-Stadium kann den dann
|
|||
|
|
ggf. ergaenzen.
|
|||
|
|
"""
|
|||
|
|
start = s.find(open_char)
|
|||
|
|
if start == -1:
|
|||
|
|
return None
|
|||
|
|
depth = 0
|
|||
|
|
in_str = False
|
|||
|
|
esc = False
|
|||
|
|
for i in range(start, len(s)):
|
|||
|
|
c = s[i]
|
|||
|
|
if in_str:
|
|||
|
|
if esc:
|
|||
|
|
esc = False
|
|||
|
|
elif c == "\\":
|
|||
|
|
esc = True
|
|||
|
|
elif c == '"':
|
|||
|
|
in_str = False
|
|||
|
|
continue
|
|||
|
|
if c == '"':
|
|||
|
|
in_str = True
|
|||
|
|
continue
|
|||
|
|
if c == open_char:
|
|||
|
|
depth += 1
|
|||
|
|
elif c == close_char:
|
|||
|
|
depth -= 1
|
|||
|
|
if depth == 0:
|
|||
|
|
return s[start:i + 1]
|
|||
|
|
# Truncated: gib trotzdem den bisher gesehenen Block zurueck
|
|||
|
|
return s[start:]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _close_truncated(block: str, open_char: str, close_char: str) -> str:
|
|||
|
|
"""Schliesst einen abgeschnittenen JSON-Block heuristisch.
|
|||
|
|
|
|||
|
|
Ansatz:
|
|||
|
|
1. Scanne Zeichen, tracke (in_string, esc, depth).
|
|||
|
|
2. Wenn am Ende ein String offen ist: schliesse mit ".
|
|||
|
|
3. Schneide einen evtl. unvollstaendigen Wert-Tail nach dem letzten
|
|||
|
|
sicheren Komma/Open-Brace/Close-Brace.
|
|||
|
|
4. Ergaenze fehlende } / ] entsprechend depth.
|
|||
|
|
"""
|
|||
|
|
s = block
|
|||
|
|
in_str = False
|
|||
|
|
esc = False
|
|||
|
|
depth = 0
|
|||
|
|
# last_safe = Position direkt nach einem komplett-abgeschlossenen Element
|
|||
|
|
# (komma, open, close), das heisst: wir koennen dort ohne Datenverlust schneiden.
|
|||
|
|
last_safe = 0
|
|||
|
|
for i, c in enumerate(s):
|
|||
|
|
if in_str:
|
|||
|
|
if esc:
|
|||
|
|
esc = False
|
|||
|
|
elif c == "\\":
|
|||
|
|
esc = True
|
|||
|
|
elif c == '"':
|
|||
|
|
in_str = False
|
|||
|
|
last_safe = i + 1
|
|||
|
|
continue
|
|||
|
|
if c == '"':
|
|||
|
|
in_str = True
|
|||
|
|
continue
|
|||
|
|
if c in "{[":
|
|||
|
|
depth += 1
|
|||
|
|
last_safe = i + 1
|
|||
|
|
elif c in "}]":
|
|||
|
|
depth -= 1
|
|||
|
|
last_safe = i + 1
|
|||
|
|
elif c == ",":
|
|||
|
|
last_safe = i # vor dem Komma ist sicher
|
|||
|
|
elif c == ":":
|
|||
|
|
# Doppelpunkt: kein safe-cut hier
|
|||
|
|
pass
|
|||
|
|
elif not c.isspace():
|
|||
|
|
# Wert-Token (Zahl, true/false/null)
|
|||
|
|
last_safe = i + 1
|
|||
|
|
|
|||
|
|
# Falls String am Ende offen: alle Zeichen behalten, am Ende " ergaenzen.
|
|||
|
|
# Sonst: schneiden auf last_safe (entfernt unvollstaendige Werte/Keys).
|
|||
|
|
if in_str:
|
|||
|
|
# String einfach schliessen, lass Inhalt drin
|
|||
|
|
s = s + '"'
|
|||
|
|
else:
|
|||
|
|
s = s[:last_safe] if last_safe > 0 else s
|
|||
|
|
|
|||
|
|
# Trailing whitespace + comma entfernen
|
|||
|
|
s = re.sub(r"[\s,]+$", "", s)
|
|||
|
|
|
|||
|
|
# Pruefe ob letzter Token ein Key ohne Wert ist: "..." am Ende vor depth-close
|
|||
|
|
# Pattern: ... "key" oder ... "key": (ohne Wert) -> entferne diesen unfertigen Eintrag
|
|||
|
|
# Naive Heuristik: wenn der Inhalt mit "key" oder "key": endet ohne folgenden Wert,
|
|||
|
|
# schneide bis zum letzten , oder { vor dieser Stelle.
|
|||
|
|
# Recompute depth nach den Aenderungen
|
|||
|
|
depth = 0
|
|||
|
|
in_str = False
|
|||
|
|
esc = False
|
|||
|
|
for c in s:
|
|||
|
|
if in_str:
|
|||
|
|
if esc:
|
|||
|
|
esc = False
|
|||
|
|
elif c == "\\":
|
|||
|
|
esc = True
|
|||
|
|
elif c == '"':
|
|||
|
|
in_str = False
|
|||
|
|
continue
|
|||
|
|
if c == '"':
|
|||
|
|
in_str = True
|
|||
|
|
elif c in "{[":
|
|||
|
|
depth += 1
|
|||
|
|
elif c in "}]":
|
|||
|
|
depth -= 1
|
|||
|
|
|
|||
|
|
# Wenn wir mit "key" oder "key": (ohne Wert!) enden, schneide bis vorheriger ,/{.
|
|||
|
|
# Wichtig: nur wenn vor diesem `"..."` ein `,` oder `{` (also Key-Position) liegt,
|
|||
|
|
# nicht wenn ein `:` (Wert-Position) liegt.
|
|||
|
|
tail_match = re.search(r'("[^"]*")(\s*:?)\s*$', s)
|
|||
|
|
if tail_match and not s.rstrip().endswith(("}", "]")):
|
|||
|
|
before = s[:tail_match.start()].rstrip()
|
|||
|
|
prev_char = before[-1] if before else ""
|
|||
|
|
# Nur trimmen, wenn dies ein Key ohne Wert ist (vor sich , oder {)
|
|||
|
|
if prev_char in ",{":
|
|||
|
|
cut = max(s.rfind(",", 0, tail_match.start()), s.rfind("{", 0, tail_match.start()))
|
|||
|
|
if cut > 0:
|
|||
|
|
s = s[:cut].rstrip().rstrip(",")
|
|||
|
|
# depth neu berechnen
|
|||
|
|
depth = 0
|
|||
|
|
in_str = False
|
|||
|
|
esc = False
|
|||
|
|
for c in s:
|
|||
|
|
if in_str:
|
|||
|
|
if esc:
|
|||
|
|
esc = False
|
|||
|
|
elif c == "\\":
|
|||
|
|
esc = True
|
|||
|
|
elif c == '"':
|
|||
|
|
in_str = False
|
|||
|
|
continue
|
|||
|
|
if c == '"':
|
|||
|
|
in_str = True
|
|||
|
|
elif c in "{[":
|
|||
|
|
depth += 1
|
|||
|
|
elif c in "}]":
|
|||
|
|
depth -= 1
|
|||
|
|
|
|||
|
|
# Fehlende Klammern ergaenzen — kann gemischt sein, einfach von rechts pruefen
|
|||
|
|
# was offen ist.
|
|||
|
|
# Wir wissen: am Anfang ist open_char, depth zaehlt {[ +1 und }] -1.
|
|||
|
|
# Fuer korrektes Schliessen muessen wir die Reihenfolge der offenen
|
|||
|
|
# Klammern kennen. Vereinfachung: zaehle separat.
|
|||
|
|
open_curly = s.count("{") - s.count("}")
|
|||
|
|
open_brack = s.count("[") - s.count("]")
|
|||
|
|
# Annahme: schliessende Klammern in umgekehrter Reihenfolge der oeffnenden
|
|||
|
|
# Naive: suche letzte offene Klammer und schliesse damit.
|
|||
|
|
while open_curly > 0 or open_brack > 0:
|
|||
|
|
# finde letzte offene Klammer im String (ausserhalb von strings)
|
|||
|
|
last_open = None
|
|||
|
|
in_str = False
|
|||
|
|
esc = False
|
|||
|
|
for i, c in enumerate(s):
|
|||
|
|
if in_str:
|
|||
|
|
if esc:
|
|||
|
|
esc = False
|
|||
|
|
elif c == "\\":
|
|||
|
|
esc = True
|
|||
|
|
elif c == '"':
|
|||
|
|
in_str = False
|
|||
|
|
continue
|
|||
|
|
if c == '"':
|
|||
|
|
in_str = True
|
|||
|
|
elif c in "{[":
|
|||
|
|
last_open = (i, c)
|
|||
|
|
if last_open is None:
|
|||
|
|
break
|
|||
|
|
# schliesse die zuletzt geoeffnete (innerste am rechten Rand)
|
|||
|
|
# Aber: koennten dazwischen schon geschlossene sein. Vereinfacht:
|
|||
|
|
# schliesse ab Ende.
|
|||
|
|
if open_curly > 0 and (open_brack == 0 or last_open[1] == "{"):
|
|||
|
|
s += "}"
|
|||
|
|
open_curly -= 1
|
|||
|
|
elif open_brack > 0:
|
|||
|
|
s += "]"
|
|||
|
|
open_brack -= 1
|
|||
|
|
else:
|
|||
|
|
break
|
|||
|
|
return s
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _normalize_quotes(s: str) -> str:
|
|||
|
|
# Ersetze typografische Anfuehrungszeichen durch ASCII (nur ausserhalb von String-Werten heikel,
|
|||
|
|
# aber pragmatisch: Modelle setzen sie fast nur als Begrenzer falsch).
|
|||
|
|
return (s.replace("“", '"').replace("”", '"')
|
|||
|
|
.replace("„", '"').replace("‟", '"')
|
|||
|
|
.replace("‘", "'").replace("’", "'"))
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _strip_trailing_commas(s: str) -> str:
|
|||
|
|
return re.sub(r",(\s*[}\]])", r"\1", s)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _escape_inner_quotes(block: str) -> str:
|
|||
|
|
"""Heuristik: in JSON-Strings unescaped " in escaped \" umwandeln.
|
|||
|
|
|
|||
|
|
Idee: Wir scannen Token fuer Token. Wenn wir in einem String sind und ein " auftritt,
|
|||
|
|
pruefen wir, ob danach ein Strukturzeichen (`,`, `}`, `]`, `:` mit moeglichem Whitespace)
|
|||
|
|
folgt. Wenn nicht, ist es ein eingebettetes Anfuehrungszeichen und wird escaped.
|
|||
|
|
"""
|
|||
|
|
out = []
|
|||
|
|
in_str = False
|
|||
|
|
esc = False
|
|||
|
|
i = 0
|
|||
|
|
while i < len(block):
|
|||
|
|
c = block[i]
|
|||
|
|
if not in_str:
|
|||
|
|
out.append(c)
|
|||
|
|
if c == '"':
|
|||
|
|
in_str = True
|
|||
|
|
i += 1
|
|||
|
|
continue
|
|||
|
|
# in_str = True
|
|||
|
|
if esc:
|
|||
|
|
out.append(c)
|
|||
|
|
esc = False
|
|||
|
|
i += 1
|
|||
|
|
continue
|
|||
|
|
if c == "\\":
|
|||
|
|
out.append(c)
|
|||
|
|
esc = True
|
|||
|
|
i += 1
|
|||
|
|
continue
|
|||
|
|
if c == '"':
|
|||
|
|
# Schau voraus: erlaubt nur whitespace + [,}\]:]
|
|||
|
|
j = i + 1
|
|||
|
|
while j < len(block) and block[j] in " \t\r\n":
|
|||
|
|
j += 1
|
|||
|
|
if j >= len(block) or block[j] in ",}]:":
|
|||
|
|
# echtes Stringende
|
|||
|
|
out.append(c)
|
|||
|
|
in_str = False
|
|||
|
|
else:
|
|||
|
|
# eingebettetes Quote -> escapen
|
|||
|
|
out.append("\\\"")
|
|||
|
|
i += 1
|
|||
|
|
continue
|
|||
|
|
out.append(c)
|
|||
|
|
i += 1
|
|||
|
|
return "".join(out)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def parse_llm_json(content: str, expect: str = "object") -> Any:
|
|||
|
|
"""Parst eine LLM-Antwort robust als JSON.
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
content: Rohantwort des Modells.
|
|||
|
|
expect: 'object' oder 'array'.
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
geparstes Python-Objekt.
|
|||
|
|
|
|||
|
|
Raises:
|
|||
|
|
ValueError, wenn nichts geparst werden konnte.
|
|||
|
|
"""
|
|||
|
|
if content is None:
|
|||
|
|
raise ValueError("leere Antwort")
|
|||
|
|
s = _normalize_quotes(_strip_codefence(content))
|
|||
|
|
|
|||
|
|
open_c, close_c = ("{", "}") if expect == "object" else ("[", "]")
|
|||
|
|
block = _find_balanced(s, open_c, close_c)
|
|||
|
|
if block is None:
|
|||
|
|
# Fallback: vielleicht steht doch das andere Format drin
|
|||
|
|
alt_open, alt_close = ("[", "]") if expect == "object" else ("{", "}")
|
|||
|
|
block = _find_balanced(s, alt_open, alt_close)
|
|||
|
|
if block is None:
|
|||
|
|
raise ValueError(f"kein {expect} gefunden in: {content[:200]}")
|
|||
|
|
|
|||
|
|
closed = _close_truncated(block, open_c, close_c)
|
|||
|
|
attempts = [
|
|||
|
|
block,
|
|||
|
|
_strip_trailing_commas(block),
|
|||
|
|
_escape_inner_quotes(block),
|
|||
|
|
_strip_trailing_commas(_escape_inner_quotes(block)),
|
|||
|
|
closed,
|
|||
|
|
_strip_trailing_commas(closed),
|
|||
|
|
_escape_inner_quotes(closed),
|
|||
|
|
_strip_trailing_commas(_escape_inner_quotes(closed)),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
last_err = None
|
|||
|
|
for attempt in attempts:
|
|||
|
|
try:
|
|||
|
|
return json.loads(attempt)
|
|||
|
|
except json.JSONDecodeError as e:
|
|||
|
|
last_err = e
|
|||
|
|
continue
|
|||
|
|
raise ValueError(f"JSON-Parse fehlgeschlagen nach Repair-Versuchen: {last_err}; raw={content[:300]}")
|