gwoe-antragspruefer/app/monitoring.py

333 lines
12 KiB
Python
Raw Normal View History

"""Täglicher Monitoring-Scan für neue Landtags-Drucksachen (#135).
Nur Metadaten kein PDF-Download, kein LLM-Call.
Ablauf:
1. Iteriert alle aktiven Bundesländer via aktive_bundeslaender().
2. Ruft adapter.search("", limit=50) (Fallback: " " oder "*") auf.
3. UPSERTs Treffer in monitoring_scans. seen_first_at bleibt stabil,
last_seen_at wird immer gesetzt.
4. Aggregiert Ergebnisse in monitoring_daily_summary.
5. Gibt ScanResult zurück, aus dem run_monitoring_digest() den
Mail-Digest baut.
Kosten-Schätzung (Qwen Plus, Stand April 2026):
Quelle: https://help.aliyun.com/zh/dashscope/developer-reference/tongyi-qianwen-7b-14b-72b-api-pricing
Input: 0.0004 USD / 1 K Token
Output: 0.0012 USD / 1 K Token
Kurs: 1 USD = 0.93 EUR (Näherung April 2026)
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from .bundeslaender import aktive_bundeslaender
logger = logging.getLogger(__name__)
# ─── Kosten-Schätzung ────────────────────────────────────────────────────────
# Preise aus DashScope-Dokumentation (USD, Stand April 2026):
# https://help.aliyun.com/zh/dashscope/developer-reference/tongyi-qianwen-7b-14b-72b-api-pricing
_QWEN_PLUS_INPUT_USD_PER_1K = 0.0004
_QWEN_PLUS_OUTPUT_USD_PER_1K = 0.0012
_USD_TO_EUR = 0.93 # Näherungskurs April 2026 (als Konstante OK für Schätzung)
# Default-Annahmen pro Analyse (Durchschnittswerte aus Produktionsbetrieb)
_DEFAULT_AVG_IN_TOKENS = 20_000
_DEFAULT_AVG_OUT_TOKENS = 3_000
def estimate_cost_qwen_plus(
n_new: int,
avg_in_tokens: int = _DEFAULT_AVG_IN_TOKENS,
avg_out_tokens: int = _DEFAULT_AVG_OUT_TOKENS,
) -> float:
"""Schätzt die Analysekosten in EUR für n_new neue Drucksachen (Qwen Plus).
Rechnet auf Basis der offiziellen DashScope-Preise, Umrechnung USDEUR
mit festem Näherungskurs. Ergebnis ist eine Schätzung, keine Garantie.
Args:
n_new: Anzahl neuer Drucksachen.
avg_in_tokens: Durchschnittliche Input-Token pro Antrag (Default 20 000).
avg_out_tokens: Durchschnittliche Output-Token pro Antrag (Default 3 000).
Returns:
Geschätzte Kosten in EUR.
"""
if n_new <= 0:
return 0.0
input_cost_usd = (avg_in_tokens / 1000) * _QWEN_PLUS_INPUT_USD_PER_1K * n_new
output_cost_usd = (avg_out_tokens / 1000) * _QWEN_PLUS_OUTPUT_USD_PER_1K * n_new
total_eur = (input_cost_usd + output_cost_usd) * _USD_TO_EUR
return round(total_eur, 4)
# ─── Datenklassen ────────────────────────────────────────────────────────────
@dataclass
class BundeslandScanResult:
"""Scan-Ergebnis für ein einzelnes Bundesland."""
bundesland: str
total_seen: int = 0
new_count: int = 0
error: str | None = None
@dataclass
class DailyScanResult:
"""Gesamtergebnis eines daily_scan()-Laufs."""
scan_date: str # YYYY-MM-DD
results: list[BundeslandScanResult] = field(default_factory=list)
new_total: int = 0 # Summe aller new_count
total_seen: int = 0 # Summe aller total_seen
estimated_cost_eur: float = 0.0
errors: list[str] = field(default_factory=list)
# ─── Adapter-Suche ───────────────────────────────────────────────────────────
DEFAULT_DAILY_LIMIT = 50
# Bundesländer, die vom täglichen Monitoring-Scan ausgenommen sind.
# NI (Niedersachsen): NILAS-Portal erfordert Login — unauthentifizierte Anfragen
# liefern Login-Page-HTML, das der JSON-Comment-Parser als ~50 Junk-Records parsed.
# Ausnahme bleibt bis ein gültiger HAR-Capture vorliegt (siehe Issue #22).
_MONITORING_SKIP: frozenset[str] = frozenset({"NI"})
async def _search_adapter(adapter, bundesland_code: str, limit: int = DEFAULT_DAILY_LIMIT) -> list:
"""Sucht via Adapter nach aktuellen Drucksachen.
Probiert der Reihe nach: leerer String, Leerzeichen, Sternchen
und fängt alle Exceptions ab, damit ein Adapter-Fehler den
Gesamt-Scan nicht abbricht. ``limit`` steuert pro-Adapter-Obergrenze;
für Initial-Seeding ggf. höher setzen.
"""
for query in ("", " ", "*"):
try:
results = await adapter.search(query, limit=limit)
return results
except Exception as e:
if query == "*":
# Alle Versuche gescheitert — Exception nach oben durchreichen
raise
logger.debug(
"%s: search(%r) fehlgeschlagen (%s), versuche nächsten Query",
bundesland_code, query, e,
)
return []
# ─── Haupt-Scan ──────────────────────────────────────────────────────────────
async def daily_scan(limit: int = DEFAULT_DAILY_LIMIT) -> DailyScanResult:
"""Täglicher Scan aller aktiven Bundesländer nach neuen Drucksachen.
Kein PDF-Download, kein LLM-Call nur Metadaten. ``limit`` gilt
pro Adapter; für Initial-Seeding größer setzen (z.B. 500).
"""
from .parlamente import ADAPTERS
from .database import upsert_monitoring_scan, upsert_monitoring_summary
now_utc = datetime.now(timezone.utc)
scan_date = now_utc.strftime("%Y-%m-%d")
now_iso = now_utc.strftime("%Y-%m-%dT%H:%M:%S")
result = DailyScanResult(scan_date=scan_date)
active_bls = aktive_bundeslaender()
for bl in active_bls:
if bl.code in _MONITORING_SKIP:
logger.debug("%s: Monitoring-Skip aktiv — übersprungen", bl.code)
continue
adapter = ADAPTERS.get(bl.code)
if adapter is None:
logger.debug("Kein Adapter für %s — übersprungen", bl.code)
continue
bl_result = BundeslandScanResult(bundesland=bl.code)
try:
docs = await _search_adapter(adapter, bl.code, limit=limit)
except Exception as exc:
err_msg = f"{type(exc).__name__}: {str(exc)[:500]}"
logger.exception("Adapter-Fehler bei %s", bl.code)
bl_result.error = err_msg
result.errors.append(f"{bl.code}: {err_msg}")
await upsert_monitoring_summary(
scan_date=scan_date,
bundesland=bl.code,
total_seen=0,
new_count=0,
errors=err_msg,
)
result.results.append(bl_result)
continue
bl_result.total_seen = len(docs)
new_this_bl = 0
for doc in docs:
try:
is_new = await upsert_monitoring_scan(
bundesland=doc.bundesland,
drucksache=doc.drucksache,
title=doc.title,
datum=doc.datum,
typ=doc.typ,
typ_normiert=doc.typ_normiert,
fraktionen=doc.fraktionen,
link=doc.link,
now=now_iso,
)
if is_new:
new_this_bl += 1
except Exception:
logger.exception(
"DB-UPSERT fehlgeschlagen für %s/%s — wird übersprungen",
bl.code, getattr(doc, "drucksache", "?"),
)
bl_result.new_count = new_this_bl
await upsert_monitoring_summary(
scan_date=scan_date,
bundesland=bl.code,
total_seen=bl_result.total_seen,
new_count=bl_result.new_count,
errors=None,
)
logger.info(
"%s: %d gesehen, %d neu",
bl.code, bl_result.total_seen, bl_result.new_count,
)
result.results.append(bl_result)
result.new_total = sum(r.new_count for r in result.results)
result.total_seen = sum(r.total_seen for r in result.results)
result.estimated_cost_eur = estimate_cost_qwen_plus(result.new_total)
return result
# ─── Mail-Digest ─────────────────────────────────────────────────────────────
async def run_monitoring_digest(recipient: str) -> dict:
"""Führt daily_scan() durch und verschickt den Ergebnis-Digest per Mail.
Args:
recipient: Empfänger-Adresse (typischerweise der Admin).
Returns:
dict mit Scan-Statistiken + {"mail_sent": bool}.
"""
from .mail import send_mail
from .database import get_monitoring_new_today
from jinja2 import Environment, FileSystemLoader
from pathlib import Path
scan_result = await daily_scan()
# Neue Drucksachen für den heutigen Tag laden
new_docs = await get_monitoring_new_today(scan_result.scan_date)
# Mail-Inhalt via Template rendern
tmpl_dir = Path(__file__).resolve().parent / "templates"
env = Environment(loader=FileSystemLoader(str(tmpl_dir)), autoescape=True)
tmpl = env.get_template("monitoring_digest.html")
html_body = tmpl.render(
scan_date=scan_result.scan_date,
new_total=scan_result.new_total,
total_seen=scan_result.total_seen,
estimated_cost_eur=scan_result.estimated_cost_eur,
results=scan_result.results,
new_docs=new_docs,
errors=scan_result.errors,
)
# Plaintext-Variante
text_body = _render_plain(scan_result, new_docs)
subject = (
f"[GWÖ-Monitor] {scan_result.scan_date}"
f"{scan_result.new_total} neue Drucksachen"
+ (f" ({len(scan_result.errors)} Fehler)" if scan_result.errors else "")
)
mail_sent = False
try:
await send_mail(recipient, subject, text_body, html_body)
mail_sent = True
logger.info("Monitoring-Digest verschickt an %s", recipient)
except Exception:
logger.exception("Monitoring-Digest: Mail-Versand fehlgeschlagen")
return {
"scan_date": scan_result.scan_date,
"new_total": scan_result.new_total,
"total_seen": scan_result.total_seen,
"estimated_cost_eur": scan_result.estimated_cost_eur,
"error_count": len(scan_result.errors),
"mail_sent": mail_sent,
}
def _render_plain(scan_result: DailyScanResult, new_docs: list[dict]) -> str:
"""Baut den Plaintext-Part des Monitoring-Digests."""
from .config import settings
lines = [
f"GWÖ-Antragsprüfer — Monitoring-Digest {scan_result.scan_date}",
"=" * 60,
"",
f"Neue Drucksachen: {scan_result.new_total}",
f"Gesamt gesehen: {scan_result.total_seen}",
f"Kosten-Schätzung: {scan_result.estimated_cost_eur:.4f} EUR",
"",
]
if scan_result.errors:
lines.append(f"Fehler ({len(scan_result.errors)}):")
for e in scan_result.errors:
lines.append(f"{e}")
lines.append("")
lines.append("Bundesland-Übersicht:")
for r in scan_result.results:
status = f"{r.new_count} neu / {r.total_seen} gesehen"
if r.error:
status = f"✗ Fehler: {r.error[:80]}"
lines.append(f" {r.bundesland:6s} {status}")
lines.append("")
if new_docs:
lines.append(f"Neue Drucksachen ({len(new_docs)}):")
for doc in new_docs[:30]:
title = (doc.get("title") or doc.get("drucksache") or "")[:80]
bl = doc.get("bundesland", "")
drucks = doc.get("drucksache", "")
lines.append(f" [{bl}] {drucks}{title}")
if len(new_docs) > 30:
lines.append(f" … und {len(new_docs) - 30} weitere")
lines.append("")
lines.append(f"Webapp: {settings.base_url}")
return "\n".join(lines)
if __name__ == "__main__":
# python -m app.monitoring <empfaenger@example.com>
import sys
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
to = sys.argv[1] if len(sys.argv) > 1 else "mail@tobiasroedel.de"
stats = asyncio.run(run_monitoring_digest(to))
print(f"Monitoring-Scan fertig: {stats}")