gwoe-antragspruefer/app/adapters/qwen_bewerter.py

112 lines
4.0 KiB
Python
Raw Permalink Normal View History

"""QwenBewerter — Produktions-Adapter für den LlmBewerter-Port.
Kapselt den ``AsyncOpenAI``-Client gegen die DashScope-API, den Retry-
Loop mit Temperatur-Escalation und das Markdown-Fence-Stripping. Die
Retry-Semantik bleibt identisch zu ``analyzer.py`` vor der Migration:
bis zu ``max_retries`` Versuche, Temperatur steigt um 0.1 pro Versuch.
Der Adapter gibt den geparsten ``dict`` zurück Pydantic-Validierung,
Citation-Binding und Missing-Programme-Check bleiben Sache des Callers
in ``analyzer.py``.
"""
from __future__ import annotations
import hashlib
import json
import logging
from typing import Optional
from ..config import settings
from ..ports.llm_bewerter import LlmRequest
logger = logging.getLogger(__name__)
def _content_fingerprint(content: str) -> str:
"""Log-sicherer Identifier ohne PII-Leak (Issue #57 Befund #4)."""
if not content:
return "len=0"
h = hashlib.sha1(content.encode("utf-8", errors="replace")).hexdigest()[:8]
return f"len={len(content)} sha1={h}"
def _strip_markdown_fences(content: str) -> str:
"""Entfernt Markdown-Code-Fences, die Qwen trotz Prompt manchmal ergänzt.
In Sync mit ``analyzer.py`` vor der Migration; Einheitstests in
``tests/test_analyzer.py`` spiegeln exakt diese Logik.
"""
content = content.strip()
if content.startswith("```"):
content = content.split("\n", 1)[1]
if content.endswith("```"):
content = content.rsplit("```", 1)[0]
if content.startswith("```json"):
content = content[7:]
return content.strip()
class QwenBewerter:
"""LlmBewerter-Adapter für Qwen Plus (via DashScope)."""
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
client=None,
) -> None:
"""Konstruktor-Injection erlaubt Tests, einen Mock-Client zu reichen
ohne Netzwerk-Zugriff. Prod nutzt den Default: Settings aus
``config.py`` + ``AsyncOpenAI``."""
self._api_key = api_key or settings.dashscope_api_key
self._base_url = base_url or settings.dashscope_base_url
self._client = client # lazy-created in .bewerte() wenn nicht gesetzt
def _get_client(self):
if self._client is not None:
return self._client
# Lazy-Import, damit die Test-Suite ohne ``openai``-Paket laufen kann.
from openai import AsyncOpenAI
self._client = AsyncOpenAI(api_key=self._api_key, base_url=self._base_url)
return self._client
async def bewerte(self, request: LlmRequest) -> dict:
"""Führt den LLM-Call aus, bis JSON-Parse klappt oder Retries erschöpft."""
client = self._get_client()
last_error: Optional[Exception] = None
for attempt in range(request.max_retries):
response = await client.chat.completions.create(
model=request.model,
messages=[
{"role": "system", "content": request.system_prompt},
{"role": "user", "content": request.user_prompt},
],
temperature=request.base_temperature + (attempt * 0.1),
max_tokens=request.max_tokens,
)
content = response.choices[0].message.content.strip()
content = _strip_markdown_fences(content)
try:
return json.loads(content)
except json.JSONDecodeError as e:
last_error = e
logger.warning(
"LLM JSON parse error attempt %d/%d (%s) — content %s",
attempt + 1, request.max_retries, e,
_content_fingerprint(content),
)
if attempt >= request.max_retries - 1:
logger.error(
"LLM JSON parsing exhausted retries, content %s",
_content_fingerprint(content),
)
raise
# Unreachable — letzter Versuch hat raised. Für Typcheck.
assert last_error is not None
raise last_error