"""Robuster JSON-Parser fuer LLM-Antworten. Behebt typische Probleme: - Markdown-Codefences (```json ... ```) - Vorspann/Nachspann ausserhalb des JSON-Blocks - Trailing commas - Unescaped quotes innerhalb von Strings (heuristisch) - Smart-Quotes """ import json import re from typing import Any, Optional def _strip_codefence(s: str) -> str: s = s.strip() if s.startswith("```"): # entferne erste Zeile ```... und schliessende ``` s = re.sub(r"^```[a-zA-Z0-9_-]*\s*\n?", "", s) s = re.sub(r"\n?```\s*$", "", s) return s.strip() def _find_balanced(s: str, open_char: str, close_char: str) -> Optional[str]: """Extrahiere ersten balancierten {...} oder [...]-Block, respektiert String-Literals. Wenn keine vollstaendige Balance erreicht wird (truncated JSON), wird der bis zum Ende verfuegbare Block zurueckgegeben — das Repair-Pipeline-Stadium kann den dann ggf. ergaenzen. """ start = s.find(open_char) if start == -1: return None depth = 0 in_str = False esc = False for i in range(start, len(s)): c = s[i] if in_str: if esc: esc = False elif c == "\\": esc = True elif c == '"': in_str = False continue if c == '"': in_str = True continue if c == open_char: depth += 1 elif c == close_char: depth -= 1 if depth == 0: return s[start:i + 1] # Truncated: gib trotzdem den bisher gesehenen Block zurueck return s[start:] def _close_truncated(block: str, open_char: str, close_char: str) -> str: """Schliesst einen abgeschnittenen JSON-Block heuristisch. Ansatz: 1. Scanne Zeichen, tracke (in_string, esc, depth). 2. Wenn am Ende ein String offen ist: schliesse mit ". 3. Schneide einen evtl. unvollstaendigen Wert-Tail nach dem letzten sicheren Komma/Open-Brace/Close-Brace. 4. Ergaenze fehlende } / ] entsprechend depth. """ s = block in_str = False esc = False depth = 0 # last_safe = Position direkt nach einem komplett-abgeschlossenen Element # (komma, open, close), das heisst: wir koennen dort ohne Datenverlust schneiden. last_safe = 0 for i, c in enumerate(s): if in_str: if esc: esc = False elif c == "\\": esc = True elif c == '"': in_str = False last_safe = i + 1 continue if c == '"': in_str = True continue if c in "{[": depth += 1 last_safe = i + 1 elif c in "}]": depth -= 1 last_safe = i + 1 elif c == ",": last_safe = i # vor dem Komma ist sicher elif c == ":": # Doppelpunkt: kein safe-cut hier pass elif not c.isspace(): # Wert-Token (Zahl, true/false/null) last_safe = i + 1 # Falls String am Ende offen: alle Zeichen behalten, am Ende " ergaenzen. # Sonst: schneiden auf last_safe (entfernt unvollstaendige Werte/Keys). if in_str: # String einfach schliessen, lass Inhalt drin s = s + '"' else: s = s[:last_safe] if last_safe > 0 else s # Trailing whitespace + comma entfernen s = re.sub(r"[\s,]+$", "", s) # Pruefe ob letzter Token ein Key ohne Wert ist: "..." am Ende vor depth-close # Pattern: ... "key" oder ... "key": (ohne Wert) -> entferne diesen unfertigen Eintrag # Naive Heuristik: wenn der Inhalt mit "key" oder "key": endet ohne folgenden Wert, # schneide bis zum letzten , oder { vor dieser Stelle. # Recompute depth nach den Aenderungen depth = 0 in_str = False esc = False for c in s: if in_str: if esc: esc = False elif c == "\\": esc = True elif c == '"': in_str = False continue if c == '"': in_str = True elif c in "{[": depth += 1 elif c in "}]": depth -= 1 # Wenn wir mit "key" oder "key": (ohne Wert!) enden, schneide bis vorheriger ,/{. # Wichtig: nur wenn vor diesem `"..."` ein `,` oder `{` (also Key-Position) liegt, # nicht wenn ein `:` (Wert-Position) liegt. tail_match = re.search(r'("[^"]*")(\s*:?)\s*$', s) if tail_match and not s.rstrip().endswith(("}", "]")): before = s[:tail_match.start()].rstrip() prev_char = before[-1] if before else "" # Nur trimmen, wenn dies ein Key ohne Wert ist (vor sich , oder {) if prev_char in ",{": cut = max(s.rfind(",", 0, tail_match.start()), s.rfind("{", 0, tail_match.start())) if cut > 0: s = s[:cut].rstrip().rstrip(",") # depth neu berechnen depth = 0 in_str = False esc = False for c in s: if in_str: if esc: esc = False elif c == "\\": esc = True elif c == '"': in_str = False continue if c == '"': in_str = True elif c in "{[": depth += 1 elif c in "}]": depth -= 1 # Fehlende Klammern ergaenzen — kann gemischt sein, einfach von rechts pruefen # was offen ist. # Wir wissen: am Anfang ist open_char, depth zaehlt {[ +1 und }] -1. # Fuer korrektes Schliessen muessen wir die Reihenfolge der offenen # Klammern kennen. Vereinfachung: zaehle separat. open_curly = s.count("{") - s.count("}") open_brack = s.count("[") - s.count("]") # Annahme: schliessende Klammern in umgekehrter Reihenfolge der oeffnenden # Naive: suche letzte offene Klammer und schliesse damit. while open_curly > 0 or open_brack > 0: # finde letzte offene Klammer im String (ausserhalb von strings) last_open = None in_str = False esc = False for i, c in enumerate(s): if in_str: if esc: esc = False elif c == "\\": esc = True elif c == '"': in_str = False continue if c == '"': in_str = True elif c in "{[": last_open = (i, c) if last_open is None: break # schliesse die zuletzt geoeffnete (innerste am rechten Rand) # Aber: koennten dazwischen schon geschlossene sein. Vereinfacht: # schliesse ab Ende. if open_curly > 0 and (open_brack == 0 or last_open[1] == "{"): s += "}" open_curly -= 1 elif open_brack > 0: s += "]" open_brack -= 1 else: break return s def _normalize_quotes(s: str) -> str: # Ersetze typografische Anfuehrungszeichen durch ASCII (nur ausserhalb von String-Werten heikel, # aber pragmatisch: Modelle setzen sie fast nur als Begrenzer falsch). return (s.replace("“", '"').replace("”", '"') .replace("„", '"').replace("‟", '"') .replace("‘", "'").replace("’", "'")) def _strip_trailing_commas(s: str) -> str: return re.sub(r",(\s*[}\]])", r"\1", s) def _escape_inner_quotes(block: str) -> str: """Heuristik: in JSON-Strings unescaped " in escaped \" umwandeln. Idee: Wir scannen Token fuer Token. Wenn wir in einem String sind und ein " auftritt, pruefen wir, ob danach ein Strukturzeichen (`,`, `}`, `]`, `:` mit moeglichem Whitespace) folgt. Wenn nicht, ist es ein eingebettetes Anfuehrungszeichen und wird escaped. """ out = [] in_str = False esc = False i = 0 while i < len(block): c = block[i] if not in_str: out.append(c) if c == '"': in_str = True i += 1 continue # in_str = True if esc: out.append(c) esc = False i += 1 continue if c == "\\": out.append(c) esc = True i += 1 continue if c == '"': # Schau voraus: erlaubt nur whitespace + [,}\]:] j = i + 1 while j < len(block) and block[j] in " \t\r\n": j += 1 if j >= len(block) or block[j] in ",}]:": # echtes Stringende out.append(c) in_str = False else: # eingebettetes Quote -> escapen out.append("\\\"") i += 1 continue out.append(c) i += 1 return "".join(out) def parse_llm_json(content: str, expect: str = "object") -> Any: """Parst eine LLM-Antwort robust als JSON. Args: content: Rohantwort des Modells. expect: 'object' oder 'array'. Returns: geparstes Python-Objekt. Raises: ValueError, wenn nichts geparst werden konnte. """ if content is None: raise ValueError("leere Antwort") s = _normalize_quotes(_strip_codefence(content)) open_c, close_c = ("{", "}") if expect == "object" else ("[", "]") block = _find_balanced(s, open_c, close_c) if block is None: # Fallback: vielleicht steht doch das andere Format drin alt_open, alt_close = ("[", "]") if expect == "object" else ("{", "}") block = _find_balanced(s, alt_open, alt_close) if block is None: raise ValueError(f"kein {expect} gefunden in: {content[:200]}") closed = _close_truncated(block, open_c, close_c) attempts = [ block, _strip_trailing_commas(block), _escape_inner_quotes(block), _strip_trailing_commas(_escape_inner_quotes(block)), closed, _strip_trailing_commas(closed), _escape_inner_quotes(closed), _strip_trailing_commas(_escape_inner_quotes(closed)), ] last_err = None for attempt in attempts: try: return json.loads(attempt) except json.JSONDecodeError as e: last_err = e continue raise ValueError(f"JSON-Parse fehlgeschlagen nach Repair-Versuchen: {last_err}; raw={content[:300]}")