Neue Tests in dieser Migration: - test_database.py (Merkliste-CRUD, Subscriptions, abgeordnetenwatch-Joins) - test_clustering.py (82% Coverage) - test_drucksache_typen.py (100%) - test_mail.py (86%) - test_monitoring.py (23 Tests) - test_abgeordnetenwatch.py (23 Tests, inkl. Drucksache-Extraction) - test_redline_parser.py (20 Tests fuer §INS§/§DEL§-Marker) - test_bug_regressions.py (PRAGMA, JWT-azp, CDU-PDF, PFLICHT-FRAKTIONEN, NRW-Titel) - test_embeddings_v3_v4.py (WRITE/READ-Pattern) - test_wahlprogramm_check.py (#128) - test_wahlprogramm_fetch.py (#138) - test_antrag/bewertung/abonnement_repository.py + test_llm_bewerter.py (DDD) - test_domain_behavior.py (5 Domain-Methoden boundary tests) - tests/e2e/test_ui.py (Playwright) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
354 lines
13 KiB
Python
354 lines
13 KiB
Python
"""Unit-Tests für app/monitoring.py (#135).
|
|
|
|
Testet:
|
|
- Kosten-Schätzung (estimate_cost_qwen_plus)
|
|
- daily_scan() mit Fake-Adapter (kein Netzwerk, kein LLM)
|
|
- daily_summary-Aggregation über mehrere Bundesländer
|
|
- Fehlerbehandlung: Adapter-Exception soll anderen BL nicht blockieren
|
|
- Plaintext-Render (_render_plain)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import sys
|
|
import types
|
|
from dataclasses import dataclass
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
|
|
# ─── Dependency-Stubs (analog conftest.py) ──────────────────────────────────
|
|
|
|
def _stub(name: str, **attrs) -> None:
|
|
if name in sys.modules:
|
|
return
|
|
mod = types.ModuleType(name)
|
|
for k, v in attrs.items():
|
|
setattr(mod, k, v)
|
|
sys.modules[name] = mod
|
|
|
|
_stub("aiosqlite")
|
|
_stub("fitz")
|
|
_stub("bs4", BeautifulSoup=lambda *a, **kw: None)
|
|
_stub("openai", OpenAI=lambda **kw: None)
|
|
|
|
# ─── Imports ─────────────────────────────────────────────────────────────────
|
|
|
|
from app.monitoring import (
|
|
estimate_cost_qwen_plus,
|
|
BundeslandScanResult,
|
|
DailyScanResult,
|
|
_render_plain,
|
|
_search_adapter,
|
|
_QWEN_PLUS_INPUT_USD_PER_1K,
|
|
_QWEN_PLUS_OUTPUT_USD_PER_1K,
|
|
_USD_TO_EUR,
|
|
)
|
|
|
|
|
|
# ─── Hilfsobjekte ─────────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class FakeDrucksache:
|
|
drucksache: str
|
|
title: str
|
|
bundesland: str
|
|
fraktionen: list
|
|
datum: str = "2026-04-20"
|
|
link: str = "https://example.com/test.pdf"
|
|
typ: str = "Antrag"
|
|
typ_normiert: str = "antrag"
|
|
|
|
|
|
class FakeAdapter:
|
|
"""Adapter-Stub mit konfigurierbaren Suchergebnissen."""
|
|
|
|
def __init__(self, bundesland: str, docs: list, fail: bool = False):
|
|
self.bundesland = bundesland
|
|
self._docs = docs
|
|
self._fail = fail
|
|
self.called_with: list[tuple] = []
|
|
|
|
async def search(self, query: str, limit: int = 20) -> list:
|
|
self.called_with.append((query, limit))
|
|
if self._fail:
|
|
raise ConnectionError(f"Fake-Fehler für {self.bundesland}")
|
|
return self._docs
|
|
|
|
|
|
# ─── Kosten-Schätzung ────────────────────────────────────────────────────────
|
|
|
|
class TestEstimateCostQwenPlus:
|
|
def test_zero_new_is_zero_cost(self):
|
|
assert estimate_cost_qwen_plus(0) == 0.0
|
|
|
|
def test_negative_new_is_zero_cost(self):
|
|
assert estimate_cost_qwen_plus(-5) == 0.0
|
|
|
|
def test_one_antrag_reasonable_range(self):
|
|
"""Ein Antrag mit Default-Werten sollte wenige Cent kosten."""
|
|
cost = estimate_cost_qwen_plus(1)
|
|
assert 0.005 < cost < 0.02, f"Unerwartete Kosten: {cost}"
|
|
|
|
def test_cost_scales_linearly(self):
|
|
c1 = estimate_cost_qwen_plus(1)
|
|
c10 = estimate_cost_qwen_plus(10)
|
|
# round() in der Funktion kann minimal divergieren — 0.001 Toleranz
|
|
assert abs(c10 - c1 * 10) < 0.001
|
|
|
|
def test_manual_calculation(self):
|
|
"""Prüft die Formel gegen manuelle Berechnung."""
|
|
n, in_t, out_t = 5, 20_000, 3_000
|
|
expected_usd = (
|
|
(in_t / 1000) * _QWEN_PLUS_INPUT_USD_PER_1K * n
|
|
+ (out_t / 1000) * _QWEN_PLUS_OUTPUT_USD_PER_1K * n
|
|
)
|
|
expected_eur = round(expected_usd * _USD_TO_EUR, 4)
|
|
assert estimate_cost_qwen_plus(n, in_t, out_t) == expected_eur
|
|
|
|
def test_custom_token_counts_used(self):
|
|
cheap = estimate_cost_qwen_plus(10, avg_in_tokens=1000, avg_out_tokens=100)
|
|
expensive = estimate_cost_qwen_plus(10, avg_in_tokens=50_000, avg_out_tokens=10_000)
|
|
assert cheap < expensive
|
|
|
|
def test_result_is_float(self):
|
|
assert isinstance(estimate_cost_qwen_plus(3), float)
|
|
|
|
|
|
# ─── _search_adapter ─────────────────────────────────────────────────────────
|
|
|
|
class TestSearchAdapter:
|
|
def test_empty_string_query_works(self):
|
|
doc = FakeDrucksache("18/1", "Test", "NRW", ["SPD"])
|
|
adapter = FakeAdapter("NRW", [doc])
|
|
result = asyncio.run(_search_adapter(adapter, "NRW"))
|
|
assert len(result) == 1
|
|
# Erster Versuch mit leerem String
|
|
assert adapter.called_with[0][0] == ""
|
|
|
|
def test_fallback_to_space_on_first_failure(self):
|
|
"""Wenn leerer String fehlschlägt, wird Leerzeichen probiert."""
|
|
doc = FakeDrucksache("18/2", "Fallback", "NRW", ["CDU"])
|
|
|
|
call_count = [0]
|
|
|
|
class PartialFailAdapter:
|
|
bundesland = "NRW"
|
|
|
|
async def search(self, query: str, limit: int = 20):
|
|
call_count[0] += 1
|
|
if query == "":
|
|
raise ValueError("Leerer Query nicht erlaubt")
|
|
return [doc]
|
|
|
|
result = asyncio.run(_search_adapter(PartialFailAdapter(), "NRW"))
|
|
assert len(result) == 1
|
|
assert call_count[0] == 2 # erstes Fail, zweiter Versuch erfolgreich
|
|
|
|
def test_all_queries_fail_raises(self):
|
|
adapter = FakeAdapter("NRW", [], fail=True)
|
|
with pytest.raises(ConnectionError):
|
|
asyncio.run(_search_adapter(adapter, "NRW"))
|
|
|
|
|
|
# ─── daily_scan ──────────────────────────────────────────────────────────────
|
|
|
|
def _make_docs(bl: str, n: int) -> list:
|
|
return [
|
|
FakeDrucksache(
|
|
drucksache=f"{bl}/100{i}",
|
|
title=f"Testantrag {i}",
|
|
bundesland=bl,
|
|
fraktionen=["SPD"],
|
|
)
|
|
for i in range(n)
|
|
]
|
|
|
|
|
|
class TestDailyScan:
|
|
def _run_scan_with_adapters(self, adapters_dict: dict, bl_codes: list) -> DailyScanResult:
|
|
"""Führt daily_scan() mit gefakten Adapters und BL-Liste aus."""
|
|
from app.bundeslaender import Bundesland
|
|
|
|
fake_bls = [
|
|
Bundesland(
|
|
code=code,
|
|
name=code,
|
|
parlament_name=code,
|
|
wahlperiode=1,
|
|
wahlperiode_start="2024-01-01",
|
|
naechste_wahl=None,
|
|
regierungsfraktionen=[],
|
|
landtagsfraktionen=[],
|
|
doku_system="Test",
|
|
doku_base_url="http://example.com",
|
|
drucksache_format="1/1234",
|
|
dokukratie_scraper=None,
|
|
aktiv=True,
|
|
)
|
|
for code in bl_codes
|
|
]
|
|
|
|
db_upsert_calls: list[dict] = []
|
|
summary_calls: list[dict] = []
|
|
|
|
async def fake_upsert_scan(**kwargs) -> bool:
|
|
db_upsert_calls.append(kwargs)
|
|
# Eintrag ist "neu" wenn drucksache endet auf "00" oder "01" (erstes Mal)
|
|
return True
|
|
|
|
async def fake_upsert_summary(**kwargs) -> None:
|
|
summary_calls.append(kwargs)
|
|
|
|
import app.monitoring as mon_mod
|
|
import app.database as db_mod
|
|
|
|
with (
|
|
patch("app.monitoring.aktive_bundeslaender", return_value=fake_bls),
|
|
patch("app.monitoring.ADAPTERS", adapters_dict, create=True),
|
|
patch.object(db_mod, "upsert_monitoring_scan", side_effect=fake_upsert_scan),
|
|
patch.object(db_mod, "upsert_monitoring_summary", side_effect=fake_upsert_summary),
|
|
):
|
|
# ADAPTERS wird innerhalb von daily_scan() aus parlamente importiert —
|
|
# wir patchen direkt im Modul-Namespace über die import-Referenz
|
|
import app.parlamente as parl_mod
|
|
original_adapters = getattr(parl_mod, "ADAPTERS", {})
|
|
parl_mod.ADAPTERS = adapters_dict
|
|
try:
|
|
result = asyncio.run(mon_mod.daily_scan())
|
|
finally:
|
|
parl_mod.ADAPTERS = original_adapters
|
|
|
|
return result, db_upsert_calls, summary_calls
|
|
|
|
def test_single_bl_all_new(self):
|
|
docs = _make_docs("NRW", 3)
|
|
adapter = FakeAdapter("NRW", docs)
|
|
result, upserts, summaries = self._run_scan_with_adapters({"NRW": adapter}, ["NRW"])
|
|
|
|
assert result.new_total == 3
|
|
assert result.total_seen == 3
|
|
assert len(result.results) == 1
|
|
assert result.results[0].bundesland == "NRW"
|
|
assert len(upserts) == 3
|
|
|
|
def test_multiple_bl_aggregated(self):
|
|
adapters = {
|
|
"NRW": FakeAdapter("NRW", _make_docs("NRW", 5)),
|
|
"BY": FakeAdapter("BY", _make_docs("BY", 2)),
|
|
}
|
|
result, _, summaries = self._run_scan_with_adapters(adapters, ["NRW", "BY"])
|
|
|
|
assert result.new_total == 7
|
|
assert result.total_seen == 7
|
|
assert len(result.results) == 2
|
|
# Eine Summary pro BL
|
|
bl_codes = {s["bundesland"] for s in summaries}
|
|
assert "NRW" in bl_codes
|
|
assert "BY" in bl_codes
|
|
|
|
def test_adapter_exception_does_not_block_other_bls(self):
|
|
adapters = {
|
|
"NRW": FakeAdapter("NRW", _make_docs("NRW", 3)),
|
|
"BY": FakeAdapter("BY", [], fail=True),
|
|
"BE": FakeAdapter("BE", _make_docs("BE", 2)),
|
|
}
|
|
result, upserts, summaries = self._run_scan_with_adapters(
|
|
adapters, ["NRW", "BY", "BE"]
|
|
)
|
|
|
|
# NRW + BE erfolgreich, BY fehlerhaft
|
|
assert result.new_total == 5
|
|
assert len(result.errors) == 1
|
|
assert "BY" in result.errors[0]
|
|
|
|
successful_bls = [r.bundesland for r in result.results if not r.error]
|
|
assert "NRW" in successful_bls
|
|
assert "BE" in successful_bls
|
|
|
|
def test_no_adapter_for_bl_skipped_gracefully(self):
|
|
adapters = {} # kein Adapter für keinen BL
|
|
result, upserts, _ = self._run_scan_with_adapters(adapters, ["NRW"])
|
|
|
|
assert result.new_total == 0
|
|
assert len(upserts) == 0
|
|
assert len(result.errors) == 0
|
|
|
|
def test_estimated_cost_non_zero_when_new_docs(self):
|
|
docs = _make_docs("NRW", 10)
|
|
adapters = {"NRW": FakeAdapter("NRW", docs)}
|
|
result, _, _ = self._run_scan_with_adapters(adapters, ["NRW"])
|
|
|
|
assert result.estimated_cost_eur > 0
|
|
|
|
def test_scan_date_is_today(self):
|
|
from datetime import datetime, timezone
|
|
adapters = {"NRW": FakeAdapter("NRW", [])}
|
|
result, _, _ = self._run_scan_with_adapters(adapters, ["NRW"])
|
|
|
|
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
assert result.scan_date == today
|
|
|
|
|
|
# ─── _render_plain ────────────────────────────────────────────────────────────
|
|
|
|
class TestRenderPlain:
|
|
def _make_result(self, new_total=2, total_seen=10, errors=None) -> DailyScanResult:
|
|
results = [
|
|
BundeslandScanResult(bundesland="NRW", total_seen=8, new_count=2),
|
|
BundeslandScanResult(bundesland="BY", total_seen=2, new_count=0),
|
|
]
|
|
if errors:
|
|
results.append(
|
|
BundeslandScanResult(bundesland="SN", total_seen=0, new_count=0, error=errors[0])
|
|
)
|
|
return DailyScanResult(
|
|
scan_date="2026-04-20",
|
|
results=results,
|
|
new_total=new_total,
|
|
total_seen=total_seen,
|
|
estimated_cost_eur=0.0093,
|
|
errors=errors or [],
|
|
)
|
|
|
|
def test_contains_scan_date(self):
|
|
text = _render_plain(self._make_result(), [])
|
|
assert "2026-04-20" in text
|
|
|
|
def test_contains_new_total(self):
|
|
text = _render_plain(self._make_result(new_total=5), [])
|
|
assert "5" in text
|
|
|
|
def test_contains_bundesland_codes(self):
|
|
text = _render_plain(self._make_result(), [])
|
|
assert "NRW" in text
|
|
assert "BY" in text
|
|
|
|
def test_errors_listed_when_present(self):
|
|
text = _render_plain(self._make_result(errors=["SN: Fake-Fehler"]), [])
|
|
assert "Fehler" in text
|
|
assert "SN" in text
|
|
|
|
def test_new_docs_listed(self):
|
|
docs = [{"bundesland": "NRW", "drucksache": "18/9999", "title": "Klimaschutz Plus",
|
|
"fraktionen": ["GRÜNE"]}]
|
|
text = _render_plain(self._make_result(), docs)
|
|
assert "18/9999" in text
|
|
assert "Klimaschutz Plus" in text
|
|
|
|
def test_truncation_after_30_docs(self):
|
|
docs = [
|
|
{"bundesland": "NRW", "drucksache": f"18/{i}", "title": f"Antrag {i}", "fraktionen": []}
|
|
for i in range(35)
|
|
]
|
|
text = _render_plain(self._make_result(), docs)
|
|
assert "und 5 weitere" in text
|
|
|
|
def test_no_truncation_marker_for_30_or_fewer(self):
|
|
docs = [
|
|
{"bundesland": "NRW", "drucksache": f"18/{i}", "title": f"Antrag {i}", "fraktionen": []}
|
|
for i in range(30)
|
|
]
|
|
text = _render_plain(self._make_result(), docs)
|
|
assert "weitere" not in text
|