Neue Tests in dieser Migration: - test_database.py (Merkliste-CRUD, Subscriptions, abgeordnetenwatch-Joins) - test_clustering.py (82% Coverage) - test_drucksache_typen.py (100%) - test_mail.py (86%) - test_monitoring.py (23 Tests) - test_abgeordnetenwatch.py (23 Tests, inkl. Drucksache-Extraction) - test_redline_parser.py (20 Tests fuer §INS§/§DEL§-Marker) - test_bug_regressions.py (PRAGMA, JWT-azp, CDU-PDF, PFLICHT-FRAKTIONEN, NRW-Titel) - test_embeddings_v3_v4.py (WRITE/READ-Pattern) - test_wahlprogramm_check.py (#128) - test_wahlprogramm_fetch.py (#138) - test_antrag/bewertung/abonnement_repository.py + test_llm_bewerter.py (DDD) - test_domain_behavior.py (5 Domain-Methoden boundary tests) - tests/e2e/test_ui.py (Playwright) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
50 lines
1.8 KiB
Python
50 lines
1.8 KiB
Python
"""Tests für BewertungRepository — Assessment-Versionshistorie (#136, ADR 0008)."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
from app.repositories import (
|
|
BewertungRepository,
|
|
InMemoryBewertungRepository,
|
|
SqliteBewertungRepository,
|
|
)
|
|
|
|
|
|
def _run(coro):
|
|
return asyncio.get_event_loop().run_until_complete(coro)
|
|
|
|
|
|
class TestProtocolConformance:
|
|
def test_in_memory_implements_protocol(self):
|
|
repo = InMemoryBewertungRepository()
|
|
assert isinstance(repo, BewertungRepository)
|
|
|
|
def test_sqlite_implements_protocol(self):
|
|
repo = SqliteBewertungRepository()
|
|
assert isinstance(repo, BewertungRepository)
|
|
|
|
|
|
class TestVersionHistory:
|
|
def test_empty_history_for_unknown_drucksache(self):
|
|
repo = InMemoryBewertungRepository()
|
|
assert _run(repo.versions("18/1")) == []
|
|
|
|
def test_versions_sorted_newest_first(self):
|
|
"""Contract: neueste Version zuerst — wie bei
|
|
database.get_assessment_history (ORDER BY version DESC)."""
|
|
repo = InMemoryBewertungRepository()
|
|
repo.add_version("18/1", version=1, gwoe_score=4.0, model="qwen-plus")
|
|
repo.add_version("18/1", version=2, gwoe_score=7.0, model="qwen-plus")
|
|
repo.add_version("18/1", version=3, gwoe_score=6.5, model="qwen-plus")
|
|
rows = _run(repo.versions("18/1"))
|
|
assert [r["version"] for r in rows] == [3, 2, 1]
|
|
|
|
def test_versions_filter_by_drucksache(self):
|
|
repo = InMemoryBewertungRepository()
|
|
repo.add_version("18/1", version=1, gwoe_score=5.0, model="qwen-plus")
|
|
repo.add_version("18/2", version=1, gwoe_score=8.0, model="qwen-plus")
|
|
rows_a = _run(repo.versions("18/1"))
|
|
rows_b = _run(repo.versions("18/2"))
|
|
assert len(rows_a) == 1 and rows_a[0]["gwoe_score"] == 5.0
|
|
assert len(rows_b) == 1 and rows_b[0]["gwoe_score"] == 8.0
|