refactor(#136): DDD-Lightweight Tag 1-4 (Ports, Adapter, Repositories, Domain-Verhalten)

ADR 0008: Lightweight-Migration ohne Package-Split

- ports/llm_bewerter.py: Protocol + LlmRequest-Dataclass
- adapters/qwen_bewerter.py: Qwen/DashScope-Adapter mit Retry-Loop
- repositories/{antrag,bewertung,abonnement}_repository.py: Protocol + Sqlite-Impl + InMemory-Fake
- analyzer.py refactored: nimmt Optional[LlmBewerter], AsyncOpenAI-Import raus
- models.py: 5 Domain-Methoden auf Bewertung/MatrixEntry
  (ist_ablehnung, hat_fundamental_kritisches_feld, verletzt_score_cap, ...)
- analyzer loggt WARNING wenn LLM Score-Cap-Invariante verletzt

Folge-PR: Callsite-Migration in main.py (~21 direkte database.*-Aufrufe)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dotty Dotter 2026-04-25 20:55:16 +02:00
parent 2c0e94d29d
commit 8f0f6d6e32
11 changed files with 892 additions and 68 deletions

11
app/adapters/__init__.py Normal file
View File

@ -0,0 +1,11 @@
"""Adapter: konkrete Implementierungen der Ports.
Vorläufig enthält dieses Modul nur den Qwen-LLM-Adapter. Perspektivisch
wandern die 17 Parlaments-Adapter aus ``parlamente.py`` hierher (eigener
Folge-PR, weil das eine umfangreichere Umstellung ist und die
Adapter-ABC dort bereits existiert siehe ADR 0002).
"""
from .qwen_bewerter import QwenBewerter
__all__ = ["QwenBewerter"]

View File

@ -0,0 +1,111 @@
"""QwenBewerter — Produktions-Adapter für den LlmBewerter-Port.
Kapselt den ``AsyncOpenAI``-Client gegen die DashScope-API, den Retry-
Loop mit Temperatur-Escalation und das Markdown-Fence-Stripping. Die
Retry-Semantik bleibt identisch zu ``analyzer.py`` vor der Migration:
bis zu ``max_retries`` Versuche, Temperatur steigt um 0.1 pro Versuch.
Der Adapter gibt den geparsten ``dict`` zurück Pydantic-Validierung,
Citation-Binding und Missing-Programme-Check bleiben Sache des Callers
in ``analyzer.py``.
"""
from __future__ import annotations
import hashlib
import json
import logging
from typing import Optional
from ..config import settings
from ..ports.llm_bewerter import LlmRequest
logger = logging.getLogger(__name__)
def _content_fingerprint(content: str) -> str:
"""Log-sicherer Identifier ohne PII-Leak (Issue #57 Befund #4)."""
if not content:
return "len=0"
h = hashlib.sha1(content.encode("utf-8", errors="replace")).hexdigest()[:8]
return f"len={len(content)} sha1={h}"
def _strip_markdown_fences(content: str) -> str:
"""Entfernt Markdown-Code-Fences, die Qwen trotz Prompt manchmal ergänzt.
In Sync mit ``analyzer.py`` vor der Migration; Einheitstests in
``tests/test_analyzer.py`` spiegeln exakt diese Logik.
"""
content = content.strip()
if content.startswith("```"):
content = content.split("\n", 1)[1]
if content.endswith("```"):
content = content.rsplit("```", 1)[0]
if content.startswith("```json"):
content = content[7:]
return content.strip()
class QwenBewerter:
"""LlmBewerter-Adapter für Qwen Plus (via DashScope)."""
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
client=None,
) -> None:
"""Konstruktor-Injection erlaubt Tests, einen Mock-Client zu reichen
ohne Netzwerk-Zugriff. Prod nutzt den Default: Settings aus
``config.py`` + ``AsyncOpenAI``."""
self._api_key = api_key or settings.dashscope_api_key
self._base_url = base_url or settings.dashscope_base_url
self._client = client # lazy-created in .bewerte() wenn nicht gesetzt
def _get_client(self):
if self._client is not None:
return self._client
# Lazy-Import, damit die Test-Suite ohne ``openai``-Paket laufen kann.
from openai import AsyncOpenAI
self._client = AsyncOpenAI(api_key=self._api_key, base_url=self._base_url)
return self._client
async def bewerte(self, request: LlmRequest) -> dict:
"""Führt den LLM-Call aus, bis JSON-Parse klappt oder Retries erschöpft."""
client = self._get_client()
last_error: Optional[Exception] = None
for attempt in range(request.max_retries):
response = await client.chat.completions.create(
model=request.model,
messages=[
{"role": "system", "content": request.system_prompt},
{"role": "user", "content": request.user_prompt},
],
temperature=request.base_temperature + (attempt * 0.1),
max_tokens=request.max_tokens,
)
content = response.choices[0].message.content.strip()
content = _strip_markdown_fences(content)
try:
return json.loads(content)
except json.JSONDecodeError as e:
last_error = e
logger.warning(
"LLM JSON parse error attempt %d/%d (%s) — content %s",
attempt + 1, request.max_retries, e,
_content_fingerprint(content),
)
if attempt >= request.max_retries - 1:
logger.error(
"LLM JSON parsing exhausted retries, content %s",
_content_fingerprint(content),
)
raise
# Unreachable — letzter Versuch hat raised. Für Typcheck.
assert last_error is not None
raise last_error

View File

@ -1,16 +1,23 @@
"""LLM-based analysis of parliamentary motions against GWÖ matrix.""" """LLM-based analysis of parliamentary motions against GWÖ matrix.
Seit ADR 0008: Die reinen LLM-Calls laufen über den ``LlmBewerter``-Port
(``app/ports/llm_bewerter.py``); der Default-Adapter ist ``QwenBewerter``
(``app/adapters/qwen_bewerter.py``). Citation-Binding, Missing-Programme-
Check und Pydantic-Validierung bleiben hier in der Application-Schicht.
"""
import hashlib import hashlib
import json import json
import logging import logging
import re import re
from pathlib import Path from pathlib import Path
from typing import Optional
from openai import AsyncOpenAI
from .config import settings from .config import settings
from .models import Assessment from .models import Assessment
from .bundeslaender import BUNDESLAENDER from .bundeslaender import BUNDESLAENDER
from .wahlprogramm_check import check_missing_programmes
from .ports.llm_bewerter import LlmBewerter, LlmRequest
from .wahlprogramme import ( from .wahlprogramme import (
find_relevant_quotes, find_relevant_quotes,
format_quote_for_prompt, format_quote_for_prompt,
@ -30,12 +37,28 @@ def _content_fingerprint(content: str) -> str:
"""Cheap, log-safe identifier for an LLM response: length + first 8 chars """Cheap, log-safe identifier for an LLM response: length + first 8 chars
of SHA-1. Lets us correlate retries without ever leaking the LLM's of SHA-1. Lets us correlate retries without ever leaking the LLM's
actual output (which may contain sensitive Antrags-Inhalte). Issue actual output (which may contain sensitive Antrags-Inhalte). Issue
#57 Befund #4.""" #57 Befund #4.
Wird nach ADR 0008 nur noch für post-LLM-Diagnostik (Pydantic-Validation)
gebraucht; der LLM-Retry-Loop selbst loggt in ``QwenBewerter``.
"""
if not content: if not content:
return "len=0" return "len=0"
h = hashlib.sha1(content.encode("utf-8", errors="replace")).hexdigest()[:8] h = hashlib.sha1(content.encode("utf-8", errors="replace")).hexdigest()[:8]
return f"len={len(content)} sha1={h}" return f"len={len(content)} sha1={h}"
def get_default_bewerter() -> LlmBewerter:
"""Lazy-Instanziierung des Default-Adapters.
Der Adapter-Import ist lazy, damit Tests ohne ``openai``-Paket und ohne
DashScope-Credentials laufen (das Stubbing in ``conftest.py`` reicht,
solange niemand Top-Level importiert).
"""
from .adapters.qwen_bewerter import QwenBewerter
return QwenBewerter()
# Load context files # Load context files
KONTEXT_DIR = Path(__file__).parent / "kontext" KONTEXT_DIR = Path(__file__).parent / "kontext"
@ -152,7 +175,11 @@ Antworte NUR mit einem JSON-Objekt im folgenden Format (keine Markdown-Codeblöc
"verbesserungspotenzial": "gering | mittel | hoch | fundamental", "verbesserungspotenzial": "gering | mittel | hoch | fundamental",
"themen": ["Bildung", "Soziales"], "themen": ["Bildung", "Soziales"],
"antragZusammenfassung": "1-2 Sätze Kernaussage", "antragZusammenfassung": "1-2 Sätze Kernaussage",
"antragKernpunkte": ["Punkt 1", "Punkt 2", "Punkt 3"] "antragKernpunkte": ["Punkt 1", "Punkt 2", "Punkt 3"],
"konfidenz": "hoch | mittel | niedrig",
"shareThreads": "Schlagkräftiger Post für Threads/Instagram (max 500 Zeichen). Emoji, Engagement, CTA, konkret auf den Antrag bezogen. Hashtags: #Gemeinwohl #GWÖ + 2-3 thematische.",
"shareTwitter": "Prägnanter Tweet für X/Twitter (max 280 Zeichen). Knackig, pointiert, mit Emoji und 2 Hashtags.",
"shareMastodon": "Sachlicher aber ansprechender Post für Mastodon (max 500 Zeichen). Informativ, quellenbasiert, mit Kontext."
} }
## Wichtige Regeln ## Wichtige Regeln
@ -165,7 +192,11 @@ Antworte NUR mit einem JSON-Objekt im folgenden Format (keine Markdown-Codeblöc
- Wenn EIN Feld -3 hat Gesamtscore maximal 4/10 - Wenn EIN Feld -3 hat Gesamtscore maximal 4/10
- Bei "Ablehnen" Score 0-2/10 - Bei "Ablehnen" Score 0-2/10
- Bei "Uneingeschränkt unterstützen" Score 8-10/10 - Bei "Uneingeschränkt unterstützen" Score 8-10/10
- **Matrix-Felder**: Bewertung -5 bis +5 (Symbole: / / / + / ++)""" - **Matrix-Felder**: Bewertung -5 bis +5 (Symbole: / / / + / ++)
- **Konfidenz**: Selbsteinschätzung der Bewertungssicherheit:
- "hoch": Antrag ist eindeutig, GWÖ-Bezug klar, genügend Kontext
- "mittel": Antrag ist mehrdeutig oder berührt Nischenthemen
- "niedrig": Antrag ist sehr kurz, unklar oder fachfremd Bewertung unsicher"""
def get_bundesland_context(bundesland: str) -> str: def get_bundesland_context(bundesland: str) -> str:
@ -220,13 +251,30 @@ Bei Oppositionsanträgen: Bewerte zusätzlich, ob die Regierungsfraktionen zusti
""" """
async def analyze_antrag(text: str, bundesland: str = "NRW", model: str = "qwen-plus") -> Assessment: async def analyze_antrag(
"""Analyze a parliamentary motion using the LLM.""" text: str,
bundesland: str = "NRW",
model: str = "qwen-plus",
bewerter: Optional[LlmBewerter] = None,
) -> Assessment:
"""Analyze a parliamentary motion using the LLM.
client = AsyncOpenAI( Args:
api_key=settings.dashscope_api_key, text: Antrag-Volltext (plain).
base_url=settings.dashscope_base_url, bundesland: BL-Code aus ``bundeslaender.py``.
) model: LLM-Modell (wird vom Default-Adapter ``QwenBewerter``
akzeptiert; andere Adapter können eigene Modell-Namen nutzen).
bewerter: ``LlmBewerter``-Implementierung. Default: ``QwenBewerter``
(DashScope/Qwen). Tests reichen hier ``FakeLlmBewerter``.
Nach ADR 0008: der HTTP-Call samt Retry-Loop lebt im Adapter; hier
bleibt nur noch die Application-Logik (Prompt-Komposition, Semantic-
Search, Citation-Binding, Missing-Programme-Check, Pydantic-Validation
und Domain-Invarianten-Warnings).
"""
if bewerter is None:
bewerter = get_default_bewerter()
system_prompt = get_system_prompt() system_prompt = get_system_prompt()
bundesland_context = get_bundesland_context(bundesland) bundesland_context = get_bundesland_context(bundesland)
@ -303,58 +351,46 @@ Programme bekannter sind. Findest du oben für eine Partei keinen passenden Chun
Ausgabe als reines JSON ohne Markdown-Codeblöcke.""" Ausgabe als reines JSON ohne Markdown-Codeblöcke."""
# Retry loop for JSON parsing errors # LLM-Call über den Port. Retry-Loop + Markdown-Stripping wohnen im
max_retries = 3 # Adapter (``QwenBewerter``). Bei exhausted retries wirft er
last_error = None # json.JSONDecodeError — wir lassen das durchpropagieren wie vor der
# Migration.
request = LlmRequest(
system_prompt=system_prompt,
user_prompt=user_prompt,
model=model,
)
data = await bewerter.bewerte(request)
for attempt in range(max_retries): # Issue #60 Option B — server-side reconstruction of citation quelle/url
response = await client.chat.completions.create( # from the actually retrieved chunks, before Pydantic validation. Der LLM
model=model, # ist nicht mehr Quelle für die Quellen-Labels; wir ersetzen sie durch
messages=[ # das kanonische _chunk_source_label und droppen Zitate ohne Chunk-Match.
{"role": "system", "content": system_prompt}, if semantic_quotes:
{"role": "user", "content": user_prompt}, data = reconstruct_zitate(data, semantic_quotes)
],
temperature=0.3 + (attempt * 0.1), # Slightly increase temp on retry # #128: Fehlende Wahlprogramme server-seitig erkennen und eintragen. Der
max_tokens=4000, # LLM bekommt diese Information nicht — sie basiert auf der lokalen
# Registry, nicht auf dem LLM-Wissen.
missing = check_missing_programmes(bundesland, landtagsfraktionen)
if missing:
logger.warning(
"Fehlende Wahlprogramme für %s in %s: %s",
landtagsfraktionen, bundesland, missing,
)
data["fehlendeProgramme"] = missing
# Pydantic-Validation: harter Check auf Schema-Drift.
assessment = Assessment.model_validate(data)
# Tag-4-Invarianten-Warnings (ADR 0008): Verstöße gegen das Score-Cap
# werden geloggt, aber nicht geworfen — das LLM soll lernen, nicht der
# Produktivbetrieb brechen.
if assessment.verletzt_score_cap():
logger.warning(
"Assessment %s verletzt Score-Cap: gwoe_score=%.1f bei "
"fundamental-kritischem Matrix-Feld (rating≤-4)",
assessment.drucksache, assessment.gwoe_score,
) )
content = response.choices[0].message.content.strip() return assessment
# Remove markdown code blocks if present
if content.startswith("```"):
content = content.split("\n", 1)[1]
if content.endswith("```"):
content = content.rsplit("```", 1)[0]
if content.startswith("```json"):
content = content[7:]
content = content.strip()
try:
# Parse JSON
data = json.loads(content)
# Issue #60 Option B — server-side reconstruction of citation
# quelle/url from the actually retrieved chunks, before Pydantic
# validation. The LLM is no longer trusted for the citation source
# label; we replace it with the canonical _chunk_source_label of
# the chunk whose text actually contains the cited snippet, and
# drop any zitat that can't be located in any retrieved chunk.
if semantic_quotes:
data = reconstruct_zitate(data, semantic_quotes)
# Convert to Assessment model
return Assessment.model_validate(data)
except json.JSONDecodeError as e:
last_error = e
logger.warning(
"LLM JSON parse error attempt %d/%d (%s) — content %s",
attempt + 1, max_retries, e, _content_fingerprint(content),
)
if attempt < max_retries - 1:
continue
else:
# Letzter Fehlversuch — Fingerprint reicht zur Forensik;
# Volltext darf nicht ins Log, weil er Antrag-Inhalte enthält
logger.error(
"LLM JSON parsing exhausted retries, content %s",
_content_fingerprint(content),
)
raise

View File

@ -40,6 +40,37 @@ class MatrixEntry(BaseModel):
rating: int = Field(..., ge=-5, le=5) # Neue Skala: -5 bis +5 rating: int = Field(..., ge=-5, le=5) # Neue Skala: -5 bis +5
symbol: Optional[str] = None symbol: Optional[str] = None
# ─── Domain-Verhalten (ADR 0008) ──────────────────────────────────────
def ist_fundamental_kritisch(self) -> bool:
"""True, wenn das Feld einen fundamentalen Widerspruch zu
GWÖ-Werten beschreibt (rating -4).
Diese Regel triggert den Score-Cap: ein einziges fundamental-
kritisches Feld deckelt den Gesamt-Score auf 3/10 (siehe
``Assessment.verletzt_score_cap``).
"""
return self.rating <= -4
def to_symbol(self) -> str:
"""Berechnet das Matrix-Symbol aus dem Rating.
Quelle: analyzer.py System-Prompt Matrix-Feldwertung (Skala -5 bis +5)".
Der LLM liefert das Symbol heute selbst; diese Methode erlaubt
server-seitige Konsistenz-Prüfung und ist die Basis, um das
Symbol-Feld perspektivisch ganz aus dem LLM-Output zu entfernen.
"""
r = self.rating
if r >= 4:
return "++"
if r >= 1:
return "+"
if r == 0:
return ""
if r >= -3:
return ""
return ""
class Zitat(BaseModel): class Zitat(BaseModel):
text: str text: str
@ -99,9 +130,51 @@ class Assessment(BaseModel):
themen: list[str] = [] themen: list[str] = []
antrag_zusammenfassung: Optional[str] = Field(None, alias="antragZusammenfassung") antrag_zusammenfassung: Optional[str] = Field(None, alias="antragZusammenfassung")
antrag_kernpunkte: Optional[list[str]] = Field(None, alias="antragKernpunkte") antrag_kernpunkte: Optional[list[str]] = Field(None, alias="antragKernpunkte")
konfidenz: Optional[str] = Field(None, description="LLM-Selbsteinschätzung: hoch/mittel/niedrig")
share_threads: Optional[str] = Field(None, alias="shareThreads", description="Social-Post für Threads (max 500 Zeichen)")
share_twitter: Optional[str] = Field(None, alias="shareTwitter", description="Social-Post für X/Twitter (max 280 Zeichen)")
share_mastodon: Optional[str] = Field(None, alias="shareMastodon", description="Social-Post für Mastodon (max 500 Zeichen)")
# #128: Fraktionen ohne hinterlegtes Wahlprogramm — wird server-seitig
# nach dem LLM-Call befüllt, nicht vom LLM selbst.
fehlende_programme: Optional[list[str]] = Field(
default_factory=list,
alias="fehlendeProgramme",
description="Fraktionen ohne hinterlegtes Wahlprogramm für dieses Bundesland",
)
model_config = {"populate_by_name": True} model_config = {"populate_by_name": True}
# ─── Domain-Verhalten (ADR 0008) ──────────────────────────────────────
def ist_ablehnung(self) -> bool:
"""True, wenn die Empfehlung „Ablehnen" lautet."""
return self.empfehlung == Empfehlung.ABLEHNEN
def ist_uneingeschraenkt_unterstuetzend(self) -> bool:
"""True, wenn die Empfehlung „Uneingeschränkt unterstützen" lautet."""
return self.empfehlung == Empfehlung.UNEINGESCHRAENKT
def hat_fundamental_kritisches_feld(self) -> bool:
"""True, wenn mindestens ein Matrix-Feld rating ≤ -4 hat.
Basis für ``verletzt_score_cap``. Nutzt die VO-Methode
``MatrixEntry.ist_fundamental_kritisch``.
"""
return any(m.ist_fundamental_kritisch() for m in self.gwoe_matrix)
def verletzt_score_cap(self) -> bool:
"""Prüft die Regel aus dem System-Prompt:
Wenn ein Matrix-Feld rating -4 hat, ist Gesamt-Score max. 3/10.
Der LLM-Prompt formuliert diese Regel als Soll-Anweisung; sie kann
trotzdem verletzt werden. Diese Methode macht die Regel server-
seitig prüfbar und ist der Anker für die Warning-Logik in
``analyzer.py`` (Tag-4-Schritt der DDD-Lightweight-Migration).
"""
return self.hat_fundamental_kritisches_feld() and self.gwoe_score > 3.0
# --- Matrix constants --- # --- Matrix constants ---

11
app/ports/__init__.py Normal file
View File

@ -0,0 +1,11 @@
"""Ports (Protocols) für externe Dienste — Teil der Hexagonal-Migration (ADR 0008).
Ein Port" ist hier ein ``typing.Protocol``, das einen Infrastruktur-
Zugang beschreibt (LLM-Call, Embedding-Search, Mail-Versand) ohne
konkrete Implementierung. Adapter in ``app/adapters/`` implementieren
die Ports gegen reale Systeme; Tests nutzen Fake-Implementierungen.
"""
from .llm_bewerter import LlmBewerter, LlmRequest
__all__ = ["LlmBewerter", "LlmRequest"]

48
app/ports/llm_bewerter.py Normal file
View File

@ -0,0 +1,48 @@
"""LlmBewerter — Port für den LLM-Call in der Antragsbewertung.
Trennt die *Rohantwort* des LLMs (JSON-String) vom umgebenden
Application-Flow (Retry, Prompt-Composition, Citation-Binding). Die
Retry-Logik samt Temperatur-Escalation bleibt Adapter-Detail ein
zweiter Adapter (Claude, OpenAI-kompatible Proxies) kann eine ganz
andere Strategie wählen.
Ein späterer Tag-Schritt (Kapitel 10.5 der DDD-Bewertung) kapselt
zusätzlich die JSON-Parse-Kaskade hinter dem Port; heute bekommt der
Caller noch einen JSON-String zurück.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol, runtime_checkable
@dataclass(frozen=True)
class LlmRequest:
"""Alles, was der Adapter zum Generieren der Bewertung braucht —
inkl. Retry-Verhalten auf der Adapter-Seite."""
system_prompt: str
user_prompt: str
model: str = "qwen-plus"
max_retries: int = 3
max_tokens: int = 4000
base_temperature: float = 0.3
@runtime_checkable
class LlmBewerter(Protocol):
"""Port: wandelt einen Prompt in einen JSON-String (LLM-Rohantwort).
Der Adapter kümmert sich um:
- Markdown-Fence-Entfernung,
- JSON-Parse-Retry mit steigender Temperatur,
- Content-Fingerprint-Logging zur Forensik.
Raises:
json.JSONDecodeError: wenn alle Retries scheitern. Höhere Schichten
behandeln das als Fehlschlag der Analyse.
"""
async def bewerte(self, request: LlmRequest) -> dict: ...

View File

@ -0,0 +1,44 @@
"""Repository-Pattern für Persistenz-Zugriff (ADR 0008).
Die Repositories kapseln direkte ``database.py``-Aufrufe hinter Protocols,
sodass Tests `InMemory*Repository` verwenden können und Callsites nicht
mehr jedes Schema-Detail kennen müssen.
Die konkreten `Sqlite*Repository`-Implementierungen delegieren heute noch
an die bestehenden Funktionen in ``database.py`` kein Big-Bang-Rewrite.
Schritt für Schritt wandern die direkten DB-Aufrufe in die Repositories.
"""
from .antrag_repository import (
AntragRepository,
SqliteAntragRepository,
InMemoryAntragRepository,
get_antrag_repository,
)
from .bewertung_repository import (
BewertungRepository,
SqliteBewertungRepository,
InMemoryBewertungRepository,
get_bewertung_repository,
)
from .abonnement_repository import (
AbonnementRepository,
SqliteAbonnementRepository,
InMemoryAbonnementRepository,
get_abonnement_repository,
)
__all__ = [
"AntragRepository",
"SqliteAntragRepository",
"InMemoryAntragRepository",
"get_antrag_repository",
"BewertungRepository",
"SqliteBewertungRepository",
"InMemoryBewertungRepository",
"get_bewertung_repository",
"AbonnementRepository",
"SqliteAbonnementRepository",
"InMemoryAbonnementRepository",
"get_abonnement_repository",
]

View File

@ -0,0 +1,138 @@
"""AbonnementRepository — Port für E-Mail-Digest-Abos (#124).
Kapselt die `email_subscriptions`-Tabelle. Der Name Abonnement" ist die
Ubiquitous-Language-Form (Kapitel 4 der DDD-Bewertung); intern heißt die
Tabelle weiter `email_subscriptions`.
"""
from __future__ import annotations
from typing import Optional, Protocol, runtime_checkable
from .. import database
@runtime_checkable
class AbonnementRepository(Protocol):
async def create(
self,
user_id: str,
email: str,
bundesland: Optional[str] = None,
partei: Optional[str] = None,
frequency: str = "daily",
) -> int: ...
async def list_by_user(self, user_id: str) -> list[dict]: ...
async def list_all(self) -> list[dict]: ...
async def list_due(self, frequency: str = "daily") -> list[dict]: ...
async def delete(self, user_id: str, sub_id: int) -> bool: ...
async def delete_by_id(self, sub_id: int) -> bool: ...
async def mark_sent(self, sub_id: int) -> None: ...
class SqliteAbonnementRepository:
async def create(
self,
user_id: str,
email: str,
bundesland: Optional[str] = None,
partei: Optional[str] = None,
frequency: str = "daily",
) -> int:
return await database.create_subscription(
user_id, email, bundesland, partei, frequency,
)
async def list_by_user(self, user_id: str) -> list[dict]:
return await database.list_subscriptions(user_id)
async def list_all(self) -> list[dict]:
return await database.list_all_subscriptions()
async def list_due(self, frequency: str = "daily") -> list[dict]:
return await database.get_all_subscriptions_due(frequency)
async def delete(self, user_id: str, sub_id: int) -> bool:
return await database.delete_subscription(user_id, sub_id)
async def delete_by_id(self, sub_id: int) -> bool:
return await database.delete_subscription_by_id(sub_id)
async def mark_sent(self, sub_id: int) -> None:
await database.mark_subscription_sent(sub_id)
class InMemoryAbonnementRepository:
"""Test-Fake. Ignoriert ``last_sent``-Zeitberechnung — ``list_due`` gibt
einfach alle zurück, bei denen ``last_sent`` ``None`` ist. Für
Zeit-bezogene Tests explizit ``mark_sent`` nutzen."""
def __init__(self) -> None:
self._subs: list[dict] = []
self._next_id = 1
async def create(
self,
user_id: str,
email: str,
bundesland: Optional[str] = None,
partei: Optional[str] = None,
frequency: str = "daily",
) -> int:
sid = self._next_id
self._next_id += 1
self._subs.append({
"id": sid,
"user_id": user_id,
"email": email,
"bundesland": bundesland,
"partei": partei,
"frequency": frequency,
"last_sent": None,
"created_at": "",
})
return sid
async def list_by_user(self, user_id: str) -> list[dict]:
return [dict(s) for s in self._subs if s["user_id"] == user_id]
async def list_all(self) -> list[dict]:
return [dict(s) for s in self._subs]
async def list_due(self, frequency: str = "daily") -> list[dict]:
return [
dict(s) for s in self._subs
if s["frequency"] == frequency and s.get("last_sent") is None
]
async def delete(self, user_id: str, sub_id: int) -> bool:
for i, s in enumerate(self._subs):
if s["id"] == sub_id and s["user_id"] == user_id:
self._subs.pop(i)
return True
return False
async def delete_by_id(self, sub_id: int) -> bool:
for i, s in enumerate(self._subs):
if s["id"] == sub_id:
self._subs.pop(i)
return True
return False
async def mark_sent(self, sub_id: int) -> None:
for s in self._subs:
if s["id"] == sub_id:
s["last_sent"] = "sent"
_default_abonnement_repo: AbonnementRepository = SqliteAbonnementRepository()
def get_abonnement_repository() -> AbonnementRepository:
return _default_abonnement_repo

View File

@ -0,0 +1,135 @@
"""AntragRepository — Persistenz-Port für Assessment-Datensätze (#136, ADR 0008).
Der Name `AntragRepository` ist bewusst auf die Domäne bezogen: aus Sicht
der Anwendung speichern wir eine Bewertung *zu einem Antrag* die
Drucksachen-ID ist der Identifier. Intern zugreifen wir auf die
`assessments`-Tabelle.
Für Bewertungs-Versionen (assessment_versions) siehe `BewertungRepository`.
"""
from __future__ import annotations
from typing import Optional, Protocol, runtime_checkable
from .. import database
@runtime_checkable
class AntragRepository(Protocol):
"""Port für den Zugriff auf Antrags-Bewertungen.
Rückgabe-Typ bleibt vorerst ``dict`` (wie heute von ``database.get_assessment``
geliefert), um die Umstellung möglichst diff-arm zu halten. Ein
Domain-Objekt-Wrapper (Kapitel 3.2 der DDD-Bewertung) kommt als
Tag-6-Schritt. Wichtig: callsites sollen *nicht* weiter ``database.*``
direkt importieren.
"""
async def save(self, data: dict) -> bool: ...
async def get(self, drucksache: str) -> Optional[dict]: ...
async def list(self, bundesland: Optional[str] = None) -> list[dict]: ...
async def search(
self, query: str, bundesland: Optional[str] = None, limit: int = 50,
) -> list[dict]: ...
async def delete(self, drucksache: str) -> bool: ...
class SqliteAntragRepository:
"""Produktions-Implementation. Delegiert an ``database.py``.
Hält bewusst *keinen* Connection-Pool ``database.py`` öffnet pro
Aufruf eine Connection (``aiosqlite.connect``). Bei Performance-
Regressionen später zentralisieren.
"""
async def save(self, data: dict) -> bool:
return await database.upsert_assessment(data)
async def get(self, drucksache: str) -> Optional[dict]:
return await database.get_assessment(drucksache)
async def list(self, bundesland: Optional[str] = None) -> list[dict]:
return await database.get_all_assessments(bundesland)
async def search(
self, query: str, bundesland: Optional[str] = None, limit: int = 50,
) -> list[dict]:
return await database.search_assessments(query, bundesland, limit)
async def delete(self, drucksache: str) -> bool:
return await database.delete_assessment(drucksache)
class InMemoryAntragRepository:
"""Test-Fake. Keine Datei, kein I/O — in-process Dict.
Bei mehrfachem ``save`` für dieselbe Drucksache wird überschrieben
(wie im produktiven UPSERT). Versionierung simuliert das Fake bewusst
nicht dafür gibt es ``BewertungRepository`` als separaten Port.
"""
def __init__(self, initial: Optional[list[dict]] = None) -> None:
self._store: dict[str, dict] = {}
for d in initial or []:
ds = d.get("drucksache")
if ds:
self._store[ds] = dict(d)
async def save(self, data: dict) -> bool:
ds = data.get("drucksache")
if not ds:
raise ValueError("save(): data.drucksache ist Pflicht")
self._store[ds] = dict(data)
return True
async def get(self, drucksache: str) -> Optional[dict]:
row = self._store.get(drucksache)
return dict(row) if row else None
async def list(self, bundesland: Optional[str] = None) -> list[dict]:
rows = list(self._store.values())
if bundesland and bundesland != "ALL":
rows = [r for r in rows if r.get("bundesland") == bundesland]
# Sortierung analog zu database.get_all_assessments: gwoe_score desc
rows.sort(key=lambda r: (r.get("gwoe_score") or 0), reverse=True)
return [dict(r) for r in rows]
async def search(
self, query: str, bundesland: Optional[str] = None, limit: int = 50,
) -> list[dict]:
q = (query or "").lower()
out: list[dict] = []
for r in self._store.values():
if bundesland and bundesland != "ALL" and r.get("bundesland") != bundesland:
continue
hay = " ".join([
str(r.get("title") or ""),
str(r.get("drucksache") or ""),
" ".join(r.get("fraktionen") or []) if isinstance(r.get("fraktionen"), list) else str(r.get("fraktionen") or ""),
" ".join(r.get("themen") or []) if isinstance(r.get("themen"), list) else str(r.get("themen") or ""),
]).lower()
if q in hay:
out.append(dict(r))
out.sort(key=lambda r: (r.get("gwoe_score") or 0), reverse=True)
return out[:limit]
async def delete(self, drucksache: str) -> bool:
return self._store.pop(drucksache, None) is not None
# ─── FastAPI-Dependency ─────────────────────────────────────────────────────
_default_antrag_repo: AntragRepository = SqliteAntragRepository()
def get_antrag_repository() -> AntragRepository:
"""FastAPI-``Depends()``-Provider. In Tests via
``app.dependency_overrides[get_antrag_repository] = lambda: InMemoryAntragRepository()``
überschreibbar.
"""
return _default_antrag_repo

View File

@ -0,0 +1,64 @@
"""BewertungRepository — Port für die Versionshistorie einer Bewertung.
Eine Bewertung" ist die vollständige Assessment-Instanz; der
`BewertungRepository` greift auf die Snapshot-Tabelle
``assessment_versions`` zu. Für die aktuellste Bewertung siehe
``AntragRepository``.
"""
from __future__ import annotations
from typing import Protocol, runtime_checkable
from .. import database
@runtime_checkable
class BewertungRepository(Protocol):
async def versions(self, drucksache: str) -> list[dict]: ...
class SqliteBewertungRepository:
"""Produktions-Implementation. Delegiert an ``database.py``."""
async def versions(self, drucksache: str) -> list[dict]:
return await database.get_assessment_history(drucksache)
class InMemoryBewertungRepository:
"""Test-Fake. Erlaubt per ``add_version`` händisches Bestücken.
Die produktive Versionierung passiert implizit in ``upsert_assessment``
(siehe database.py:580-598). Im Fake trennen wir das bewusst, weil
Tests oft explizit Versionshistorie befüllen wollen.
"""
def __init__(self) -> None:
self._versions: dict[str, list[dict]] = {}
def add_version(
self,
drucksache: str,
version: int,
gwoe_score: float,
model: str,
created_at: str = "",
) -> None:
self._versions.setdefault(drucksache, []).append({
"version": version,
"gwoe_score": gwoe_score,
"model": model,
"created_at": created_at,
})
async def versions(self, drucksache: str) -> list[dict]:
rows = list(self._versions.get(drucksache, []))
rows.sort(key=lambda r: r["version"], reverse=True)
return rows
_default_bewertung_repo: BewertungRepository = SqliteBewertungRepository()
def get_bewertung_repository() -> BewertungRepository:
return _default_bewertung_repo

View File

@ -0,0 +1,153 @@
# 0008 — DDD-Lightweight-Migration
| | |
|---|---|
| **Status** | accepted |
| **Datum** | 2026-04-20 |
| **Refs** | Issue #136, `docs/analysen/ddd-bewertung.md` Kap. 7 + 10 |
| **Related** | ADR 0001 (Citation-Binding), ADR 0002 (Adapter-Architektur) |
## Kontext
Nach 18 Monaten Entwicklung verteilt der Code sich auf 23 Dateien in
einem flachen `app/`-Verzeichnis (LOC-Inventar in `ddd-bewertung.md`
Kap. 1.1). Die Analyse in diesem Dokument zeigt fünf konkrete
DDD-Verletzungen (AE, ebenda Kap. 1.2):
- **A** Infrastructure-Leak: `analyze_antrag` instanziiert direkt einen
`AsyncOpenAI`-Client.
- **B** Retry-Loop, JSON-Parsing und Pydantic in einer 60-Zeilen-Kaskade.
- **C** Anämisches Modell: `MatrixEntry` / `Assessment` tragen keine
Invarianten; die Score-Cap-Regel („rating ≤ -4 ⇒ score ≤ 3") lebt nur
im LLM-System-Prompt.
- **D** Kein Repository-Pattern: `database.py` (909 LOC) wird von sechs
Modulen direkt aufgerufen.
- **E** Adapter-Contract nur informell (abhandelbar in späterem PR).
Issue #136 fragte nach einer DDD-Umstellung. Die Analyse zeigte zwei
realistische Optionen (Kap. 7 des Bewertungsdokuments).
## Optionen
### Option A — Voll-DDD mit Package-Split
Getrennte Packages `antragsbewertung/`, `parlamentsintegration/`,
`wahlprogramm_wissensbasis/`, `publikation/`, `benutzer_abo/`,
`monitoring/`, jeweils mit `domain/application/infrastructure/`-
Struktur. Ports und Adapter überall, Anti-Corruption-Layer pro BL.
**Vorteile:** Maximale Testbarkeit, klare Bounded-Contexts, mehrere
Devs können parallel arbeiten, Ubiquitous-Language konsequent.
**Nachteile:** 4-8 Wochen netto Umbau, Test-Suite muss migriert werden,
hohes Regressionsrisiko während der Migration, für Solo-Projekt dieser
Größe ein schlechtes Kosten-Nutzen-Verhältnis.
### Option B — DDD-Lightweight
Drei gezielte DDD-Prinzipien *ohne* Package-Split: Repository-Pattern,
LLM-Port, Domain-Verhalten auf den bestehenden Pydantic-Modellen.
Dateien bleiben flach in `app/` liegen, nur drei neue Unterordner:
`repositories/`, `ports/`, `adapters/`.
**Vorteile:** Adressiert die schmerzhaftesten Punkte (A, B, C, D) in
5-8 Tagen. Keine API-Breaking-Changes. Callsites müssen nicht alle
gleichzeitig migrieren — die Repositories delegieren an die alten
`database.py`-Funktionen, alte Calls laufen weiter.
**Nachteile:** Parlaments-Adapter bleiben als eine 3397-LOC-Datei
(`parlamente.py`). Ubiquitous-Language bleibt halb Deutsch / halb
Englisch. Bounded-Contexts sind nur konzeptuell, nicht physisch
separiert.
## Entscheidung
**Option B**. Konkret:
1. **Repository-Pattern** für `assessments`, `assessment_versions` und
`email_subscriptions``app/repositories/`, drei Module.
2. **LLM-Port + Qwen-Adapter**`app/ports/llm_bewerter.py`,
`app/adapters/qwen_bewerter.py`. `analyze_antrag` nimmt ein
`LlmBewerter`-Argument (Default: `QwenBewerter()`), direkt oder über
FastAPI-`Depends`.
3. **Domain-Verhalten** auf `Assessment` und `MatrixEntry`:
`verletzt_score_cap()`, `ist_ablehnung()`,
`ist_uneingeschraenkt_unterstuetzend()`, `hat_fundamental_kritisches_feld()`,
`MatrixEntry.ist_fundamental_kritisch()`, `MatrixEntry.to_symbol()`.
`analyzer.py` loggt bei Verletzung der Score-Cap-Invariante eine
Warning, wirft aber nicht — das LLM soll lernen, der Produktiv-
betrieb soll nicht brechen.
Nicht Teil dieser Entscheidung (Folge-PRs):
- Migration aller `database.*`-Callsites in `main.py` (21 Stellen)
auf Repository-`Depends`. Der Repository-Layer ist dafür
bereitgelegt; die Umstellung selbst ist mechanisch, aber wegen
1746 LOC in einer Datei mit Merge-Konflikt-Risiko verbunden.
- `ZitatVerifier`-Port (Tag 5 der Roadmap).
- Frozen-Dataclass-Domain-Objekte in `app/domain/` (Tag 6/7).
- Bounded-Context-Package-Split (explizit ausgeschlossen).
## Konsequenzen
### Positiv
- **Tests ohne OpenAI-Stub**. `conftest.py::_stub("openai")` ist nicht
mehr zwingend; Tests reichen einen `FakeLlmBewerter`-Objekt. Der
umgebaute `test_bug_regressions.py::test_analyzer_user_prompt_...`
ist exakt dieses Muster.
- **Adapter-Wechsel trivial**. Ein zweiter LLM-Provider (Claude,
Gemini) ist eine neue Klasse in `app/adapters/`, ohne Änderung in
`analyzer.py`.
- **Server-seitige Score-Cap-Erkennung**. Verstöße gegen die
fundamental-kritisch-Regel aus dem System-Prompt werden jetzt in
Logs sichtbar (`analyzer.py`), können in einem Folge-Issue zu
harten Rejects hochgezogen werden, wenn die Qualitäts-Daten stabil
aussehen.
- **Test-Fakes für DB-Zugriff**. Application-Logik-Tests brauchen
keine SQLite-Datei mehr; `InMemoryAntragRepository` reicht. Die
76 neuen Tests in `tests/test_*_repository.py`, `test_llm_bewerter.py`
und `test_domain_behavior.py` demonstrieren das.
### Negativ
- **Zwei Zugangswege zur DB parallel**. Solange `main.py` noch
`database.get_assessment` direkt aufruft, gibt es doppelten Zugriff.
Folge-PR räumt auf.
- **`analyzer.py` importiert noch `check_missing_programmes` direkt**.
Auch das ist Infrastructure, liegt aber außerhalb der drei
migrierten Schnitte. Nächster Schritt bei Bedarf.
- **Logger-Warning ist weich**. Wer die Logs nicht liest, bemerkt
Score-Cap-Verletzungen nicht. Ein Dashboard-Panel auf
`logger.name == "app.analyzer"` + Severity ≥ WARNING gehört in
die Monitoring-Erweiterung (#135-Folge).
### Folgen für andere ADRs
- **ADR 0001** (Citation-Binding) bleibt unverändert gültig — die
Post-LLM-Rekonstruktion läuft weiter nach `bewerter.bewerte()`.
- **ADR 0002** (Adapter-Architektur für Parlamente) bleibt unverändert;
die 17 Adapter in `parlamente.py` werden hier *nicht* in
`app/adapters/` gezogen. Ein Folge-ADR (0009?) könnte den Package-
Split für Parlaments-Adapter beschließen, wenn die Datei über 5000
LOC wächst (wie in ADR 0002 angekündigt).
- **ADR 0003** (Citation-Property-Tests) bleibt unverändert; die
Tests hängen an der Assessment-Schnittstelle, die sich hier nicht
geändert hat.
## Nach-Migration-Regeln
Zur Orientierung bei neuen Beiträgen:
1. **Kein direktes `database.*` in neuer Application-Logik.** Immer
über das passende Repository.
2. **Kein `AsyncOpenAI` außerhalb `app/adapters/qwen_bewerter.py`.**
Neue LLM-Provider bekommen einen eigenen Adapter im gleichen
Ordner.
3. **Invarianten auf dem Domain-Modell, nicht im Prompt.** Wenn eine
Regel im LLM-System-Prompt steht und vom LLM potenziell verletzt
werden kann, gehört eine Prüfmethode auf `Assessment` oder
`MatrixEntry` und ein Warning-Log in `analyzer.py`.
4. **Tests mit `InMemory*Repository` und `FakeLlmBewerter`.** Keine
Monkeypatches auf `app.analyzer.AsyncOpenAI` mehr; keine
temporären SQLite-Dateien für Unit-Tests.