gwoe-antragspruefer/app/repositories/abonnement_repository.py
Dotty Dotter 8f0f6d6e32 refactor(#136): DDD-Lightweight Tag 1-4 (Ports, Adapter, Repositories, Domain-Verhalten)
ADR 0008: Lightweight-Migration ohne Package-Split

- ports/llm_bewerter.py: Protocol + LlmRequest-Dataclass
- adapters/qwen_bewerter.py: Qwen/DashScope-Adapter mit Retry-Loop
- repositories/{antrag,bewertung,abonnement}_repository.py: Protocol + Sqlite-Impl + InMemory-Fake
- analyzer.py refactored: nimmt Optional[LlmBewerter], AsyncOpenAI-Import raus
- models.py: 5 Domain-Methoden auf Bewertung/MatrixEntry
  (ist_ablehnung, hat_fundamental_kritisches_feld, verletzt_score_cap, ...)
- analyzer loggt WARNING wenn LLM Score-Cap-Invariante verletzt

Folge-PR: Callsite-Migration in main.py (~21 direkte database.*-Aufrufe)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 20:55:16 +02:00

139 lines
4.1 KiB
Python

"""AbonnementRepository — Port für E-Mail-Digest-Abos (#124).
Kapselt die `email_subscriptions`-Tabelle. Der Name „Abonnement" ist die
Ubiquitous-Language-Form (Kapitel 4 der DDD-Bewertung); intern heißt die
Tabelle weiter `email_subscriptions`.
"""
from __future__ import annotations
from typing import Optional, Protocol, runtime_checkable
from .. import database
@runtime_checkable
class AbonnementRepository(Protocol):
async def create(
self,
user_id: str,
email: str,
bundesland: Optional[str] = None,
partei: Optional[str] = None,
frequency: str = "daily",
) -> int: ...
async def list_by_user(self, user_id: str) -> list[dict]: ...
async def list_all(self) -> list[dict]: ...
async def list_due(self, frequency: str = "daily") -> list[dict]: ...
async def delete(self, user_id: str, sub_id: int) -> bool: ...
async def delete_by_id(self, sub_id: int) -> bool: ...
async def mark_sent(self, sub_id: int) -> None: ...
class SqliteAbonnementRepository:
async def create(
self,
user_id: str,
email: str,
bundesland: Optional[str] = None,
partei: Optional[str] = None,
frequency: str = "daily",
) -> int:
return await database.create_subscription(
user_id, email, bundesland, partei, frequency,
)
async def list_by_user(self, user_id: str) -> list[dict]:
return await database.list_subscriptions(user_id)
async def list_all(self) -> list[dict]:
return await database.list_all_subscriptions()
async def list_due(self, frequency: str = "daily") -> list[dict]:
return await database.get_all_subscriptions_due(frequency)
async def delete(self, user_id: str, sub_id: int) -> bool:
return await database.delete_subscription(user_id, sub_id)
async def delete_by_id(self, sub_id: int) -> bool:
return await database.delete_subscription_by_id(sub_id)
async def mark_sent(self, sub_id: int) -> None:
await database.mark_subscription_sent(sub_id)
class InMemoryAbonnementRepository:
"""Test-Fake. Ignoriert ``last_sent``-Zeitberechnung — ``list_due`` gibt
einfach alle zurück, bei denen ``last_sent`` ``None`` ist. Für
Zeit-bezogene Tests explizit ``mark_sent`` nutzen."""
def __init__(self) -> None:
self._subs: list[dict] = []
self._next_id = 1
async def create(
self,
user_id: str,
email: str,
bundesland: Optional[str] = None,
partei: Optional[str] = None,
frequency: str = "daily",
) -> int:
sid = self._next_id
self._next_id += 1
self._subs.append({
"id": sid,
"user_id": user_id,
"email": email,
"bundesland": bundesland,
"partei": partei,
"frequency": frequency,
"last_sent": None,
"created_at": "",
})
return sid
async def list_by_user(self, user_id: str) -> list[dict]:
return [dict(s) for s in self._subs if s["user_id"] == user_id]
async def list_all(self) -> list[dict]:
return [dict(s) for s in self._subs]
async def list_due(self, frequency: str = "daily") -> list[dict]:
return [
dict(s) for s in self._subs
if s["frequency"] == frequency and s.get("last_sent") is None
]
async def delete(self, user_id: str, sub_id: int) -> bool:
for i, s in enumerate(self._subs):
if s["id"] == sub_id and s["user_id"] == user_id:
self._subs.pop(i)
return True
return False
async def delete_by_id(self, sub_id: int) -> bool:
for i, s in enumerate(self._subs):
if s["id"] == sub_id:
self._subs.pop(i)
return True
return False
async def mark_sent(self, sub_id: int) -> None:
for s in self._subs:
if s["id"] == sub_id:
s["last_sent"] = "sent"
_default_abonnement_repo: AbonnementRepository = SqliteAbonnementRepository()
def get_abonnement_repository() -> AbonnementRepository:
return _default_abonnement_repo