gwoe-antragspruefer/app/repositories/antrag_repository.py

136 lines
5.1 KiB
Python
Raw Permalink Normal View History

"""AntragRepository — Persistenz-Port für Assessment-Datensätze (#136, ADR 0008).
Der Name `AntragRepository` ist bewusst auf die Domäne bezogen: aus Sicht
der Anwendung speichern wir eine Bewertung *zu einem Antrag* die
Drucksachen-ID ist der Identifier. Intern zugreifen wir auf die
`assessments`-Tabelle.
Für Bewertungs-Versionen (assessment_versions) siehe `BewertungRepository`.
"""
from __future__ import annotations
from typing import Optional, Protocol, runtime_checkable
from .. import database
@runtime_checkable
class AntragRepository(Protocol):
"""Port für den Zugriff auf Antrags-Bewertungen.
Rückgabe-Typ bleibt vorerst ``dict`` (wie heute von ``database.get_assessment``
geliefert), um die Umstellung möglichst diff-arm zu halten. Ein
Domain-Objekt-Wrapper (Kapitel 3.2 der DDD-Bewertung) kommt als
Tag-6-Schritt. Wichtig: callsites sollen *nicht* weiter ``database.*``
direkt importieren.
"""
async def save(self, data: dict) -> bool: ...
async def get(self, drucksache: str) -> Optional[dict]: ...
async def list(self, bundesland: Optional[str] = None) -> list[dict]: ...
async def search(
self, query: str, bundesland: Optional[str] = None, limit: int = 50,
) -> list[dict]: ...
async def delete(self, drucksache: str) -> bool: ...
class SqliteAntragRepository:
"""Produktions-Implementation. Delegiert an ``database.py``.
Hält bewusst *keinen* Connection-Pool ``database.py`` öffnet pro
Aufruf eine Connection (``aiosqlite.connect``). Bei Performance-
Regressionen später zentralisieren.
"""
async def save(self, data: dict) -> bool:
return await database.upsert_assessment(data)
async def get(self, drucksache: str) -> Optional[dict]:
return await database.get_assessment(drucksache)
async def list(self, bundesland: Optional[str] = None) -> list[dict]:
return await database.get_all_assessments(bundesland)
async def search(
self, query: str, bundesland: Optional[str] = None, limit: int = 50,
) -> list[dict]:
return await database.search_assessments(query, bundesland, limit)
async def delete(self, drucksache: str) -> bool:
return await database.delete_assessment(drucksache)
class InMemoryAntragRepository:
"""Test-Fake. Keine Datei, kein I/O — in-process Dict.
Bei mehrfachem ``save`` für dieselbe Drucksache wird überschrieben
(wie im produktiven UPSERT). Versionierung simuliert das Fake bewusst
nicht dafür gibt es ``BewertungRepository`` als separaten Port.
"""
def __init__(self, initial: Optional[list[dict]] = None) -> None:
self._store: dict[str, dict] = {}
for d in initial or []:
ds = d.get("drucksache")
if ds:
self._store[ds] = dict(d)
async def save(self, data: dict) -> bool:
ds = data.get("drucksache")
if not ds:
raise ValueError("save(): data.drucksache ist Pflicht")
self._store[ds] = dict(data)
return True
async def get(self, drucksache: str) -> Optional[dict]:
row = self._store.get(drucksache)
return dict(row) if row else None
async def list(self, bundesland: Optional[str] = None) -> list[dict]:
rows = list(self._store.values())
if bundesland and bundesland != "ALL":
rows = [r for r in rows if r.get("bundesland") == bundesland]
# Sortierung analog zu database.get_all_assessments: gwoe_score desc
rows.sort(key=lambda r: (r.get("gwoe_score") or 0), reverse=True)
return [dict(r) for r in rows]
async def search(
self, query: str, bundesland: Optional[str] = None, limit: int = 50,
) -> list[dict]:
q = (query or "").lower()
out: list[dict] = []
for r in self._store.values():
if bundesland and bundesland != "ALL" and r.get("bundesland") != bundesland:
continue
hay = " ".join([
str(r.get("title") or ""),
str(r.get("drucksache") or ""),
" ".join(r.get("fraktionen") or []) if isinstance(r.get("fraktionen"), list) else str(r.get("fraktionen") or ""),
" ".join(r.get("themen") or []) if isinstance(r.get("themen"), list) else str(r.get("themen") or ""),
]).lower()
if q in hay:
out.append(dict(r))
out.sort(key=lambda r: (r.get("gwoe_score") or 0), reverse=True)
return out[:limit]
async def delete(self, drucksache: str) -> bool:
return self._store.pop(drucksache, None) is not None
# ─── FastAPI-Dependency ─────────────────────────────────────────────────────
_default_antrag_repo: AntragRepository = SqliteAntragRepository()
def get_antrag_repository() -> AntragRepository:
"""FastAPI-``Depends()``-Provider. In Tests via
``app.dependency_overrides[get_antrag_repository] = lambda: InMemoryAntragRepository()``
überschreibbar.
"""
return _default_antrag_repo