- monitoring.py: taeglicher Scan-Adapter aller aktiven BL, kein Auto-Fetch (#135) - monitoring_digest.html: Mail-Template mit '0-Kontext'-Hinweis - abgeordnetenwatch.py + sync_*.py: Phase 1 Roll-Call-Voting (#106) - 17 Parlamente (16 BL + BT) - 9 BL-spezifische Drucksachen-Patterns + Date-Title-Fallback - 28977 Votes fuer BUND in DB - wahlprogramm_check.py: fehlende Programme erkennen (#128) - NI-Skip-Liste, NRW Empty-Query-Fallback Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
333 lines
12 KiB
Python
333 lines
12 KiB
Python
"""Täglicher Monitoring-Scan für neue Landtags-Drucksachen (#135).
|
|
|
|
Nur Metadaten — kein PDF-Download, kein LLM-Call.
|
|
|
|
Ablauf:
|
|
1. Iteriert alle aktiven Bundesländer via aktive_bundeslaender().
|
|
2. Ruft adapter.search("", limit=50) (Fallback: " " oder "*") auf.
|
|
3. UPSERTs Treffer in monitoring_scans. seen_first_at bleibt stabil,
|
|
last_seen_at wird immer gesetzt.
|
|
4. Aggregiert Ergebnisse in monitoring_daily_summary.
|
|
5. Gibt ScanResult zurück, aus dem run_monitoring_digest() den
|
|
Mail-Digest baut.
|
|
|
|
Kosten-Schätzung (Qwen Plus, Stand April 2026):
|
|
Quelle: https://help.aliyun.com/zh/dashscope/developer-reference/tongyi-qianwen-7b-14b-72b-api-pricing
|
|
Input: 0.0004 USD / 1 K Token
|
|
Output: 0.0012 USD / 1 K Token
|
|
Kurs: 1 USD = 0.93 EUR (Näherung April 2026)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
|
|
from .bundeslaender import aktive_bundeslaender
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ─── Kosten-Schätzung ────────────────────────────────────────────────────────
|
|
# Preise aus DashScope-Dokumentation (USD, Stand April 2026):
|
|
# https://help.aliyun.com/zh/dashscope/developer-reference/tongyi-qianwen-7b-14b-72b-api-pricing
|
|
_QWEN_PLUS_INPUT_USD_PER_1K = 0.0004
|
|
_QWEN_PLUS_OUTPUT_USD_PER_1K = 0.0012
|
|
_USD_TO_EUR = 0.93 # Näherungskurs April 2026 (als Konstante OK für Schätzung)
|
|
|
|
# Default-Annahmen pro Analyse (Durchschnittswerte aus Produktionsbetrieb)
|
|
_DEFAULT_AVG_IN_TOKENS = 20_000
|
|
_DEFAULT_AVG_OUT_TOKENS = 3_000
|
|
|
|
|
|
def estimate_cost_qwen_plus(
|
|
n_new: int,
|
|
avg_in_tokens: int = _DEFAULT_AVG_IN_TOKENS,
|
|
avg_out_tokens: int = _DEFAULT_AVG_OUT_TOKENS,
|
|
) -> float:
|
|
"""Schätzt die Analysekosten in EUR für n_new neue Drucksachen (Qwen Plus).
|
|
|
|
Rechnet auf Basis der offiziellen DashScope-Preise, Umrechnung USD→EUR
|
|
mit festem Näherungskurs. Ergebnis ist eine Schätzung, keine Garantie.
|
|
|
|
Args:
|
|
n_new: Anzahl neuer Drucksachen.
|
|
avg_in_tokens: Durchschnittliche Input-Token pro Antrag (Default 20 000).
|
|
avg_out_tokens: Durchschnittliche Output-Token pro Antrag (Default 3 000).
|
|
|
|
Returns:
|
|
Geschätzte Kosten in EUR.
|
|
"""
|
|
if n_new <= 0:
|
|
return 0.0
|
|
input_cost_usd = (avg_in_tokens / 1000) * _QWEN_PLUS_INPUT_USD_PER_1K * n_new
|
|
output_cost_usd = (avg_out_tokens / 1000) * _QWEN_PLUS_OUTPUT_USD_PER_1K * n_new
|
|
total_eur = (input_cost_usd + output_cost_usd) * _USD_TO_EUR
|
|
return round(total_eur, 4)
|
|
|
|
|
|
# ─── Datenklassen ────────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class BundeslandScanResult:
|
|
"""Scan-Ergebnis für ein einzelnes Bundesland."""
|
|
bundesland: str
|
|
total_seen: int = 0
|
|
new_count: int = 0
|
|
error: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class DailyScanResult:
|
|
"""Gesamtergebnis eines daily_scan()-Laufs."""
|
|
scan_date: str # YYYY-MM-DD
|
|
results: list[BundeslandScanResult] = field(default_factory=list)
|
|
new_total: int = 0 # Summe aller new_count
|
|
total_seen: int = 0 # Summe aller total_seen
|
|
estimated_cost_eur: float = 0.0
|
|
errors: list[str] = field(default_factory=list)
|
|
|
|
|
|
# ─── Adapter-Suche ───────────────────────────────────────────────────────────
|
|
|
|
DEFAULT_DAILY_LIMIT = 50
|
|
|
|
# Bundesländer, die vom täglichen Monitoring-Scan ausgenommen sind.
|
|
# NI (Niedersachsen): NILAS-Portal erfordert Login — unauthentifizierte Anfragen
|
|
# liefern Login-Page-HTML, das der JSON-Comment-Parser als ~50 Junk-Records parsed.
|
|
# Ausnahme bleibt bis ein gültiger HAR-Capture vorliegt (siehe Issue #22).
|
|
_MONITORING_SKIP: frozenset[str] = frozenset({"NI"})
|
|
|
|
|
|
async def _search_adapter(adapter, bundesland_code: str, limit: int = DEFAULT_DAILY_LIMIT) -> list:
|
|
"""Sucht via Adapter nach aktuellen Drucksachen.
|
|
|
|
Probiert der Reihe nach: leerer String, Leerzeichen, Sternchen —
|
|
und fängt alle Exceptions ab, damit ein Adapter-Fehler den
|
|
Gesamt-Scan nicht abbricht. ``limit`` steuert pro-Adapter-Obergrenze;
|
|
für Initial-Seeding ggf. höher setzen.
|
|
"""
|
|
for query in ("", " ", "*"):
|
|
try:
|
|
results = await adapter.search(query, limit=limit)
|
|
return results
|
|
except Exception as e:
|
|
if query == "*":
|
|
# Alle Versuche gescheitert — Exception nach oben durchreichen
|
|
raise
|
|
logger.debug(
|
|
"%s: search(%r) fehlgeschlagen (%s), versuche nächsten Query",
|
|
bundesland_code, query, e,
|
|
)
|
|
return []
|
|
|
|
|
|
# ─── Haupt-Scan ──────────────────────────────────────────────────────────────
|
|
|
|
async def daily_scan(limit: int = DEFAULT_DAILY_LIMIT) -> DailyScanResult:
|
|
"""Täglicher Scan aller aktiven Bundesländer nach neuen Drucksachen.
|
|
|
|
Kein PDF-Download, kein LLM-Call — nur Metadaten. ``limit`` gilt
|
|
pro Adapter; für Initial-Seeding größer setzen (z.B. 500).
|
|
"""
|
|
from .parlamente import ADAPTERS
|
|
from .database import upsert_monitoring_scan, upsert_monitoring_summary
|
|
|
|
now_utc = datetime.now(timezone.utc)
|
|
scan_date = now_utc.strftime("%Y-%m-%d")
|
|
now_iso = now_utc.strftime("%Y-%m-%dT%H:%M:%S")
|
|
|
|
result = DailyScanResult(scan_date=scan_date)
|
|
|
|
active_bls = aktive_bundeslaender()
|
|
|
|
for bl in active_bls:
|
|
if bl.code in _MONITORING_SKIP:
|
|
logger.debug("%s: Monitoring-Skip aktiv — übersprungen", bl.code)
|
|
continue
|
|
|
|
adapter = ADAPTERS.get(bl.code)
|
|
if adapter is None:
|
|
logger.debug("Kein Adapter für %s — übersprungen", bl.code)
|
|
continue
|
|
|
|
bl_result = BundeslandScanResult(bundesland=bl.code)
|
|
|
|
try:
|
|
docs = await _search_adapter(adapter, bl.code, limit=limit)
|
|
except Exception as exc:
|
|
err_msg = f"{type(exc).__name__}: {str(exc)[:500]}"
|
|
logger.exception("Adapter-Fehler bei %s", bl.code)
|
|
bl_result.error = err_msg
|
|
result.errors.append(f"{bl.code}: {err_msg}")
|
|
await upsert_monitoring_summary(
|
|
scan_date=scan_date,
|
|
bundesland=bl.code,
|
|
total_seen=0,
|
|
new_count=0,
|
|
errors=err_msg,
|
|
)
|
|
result.results.append(bl_result)
|
|
continue
|
|
|
|
bl_result.total_seen = len(docs)
|
|
new_this_bl = 0
|
|
|
|
for doc in docs:
|
|
try:
|
|
is_new = await upsert_monitoring_scan(
|
|
bundesland=doc.bundesland,
|
|
drucksache=doc.drucksache,
|
|
title=doc.title,
|
|
datum=doc.datum,
|
|
typ=doc.typ,
|
|
typ_normiert=doc.typ_normiert,
|
|
fraktionen=doc.fraktionen,
|
|
link=doc.link,
|
|
now=now_iso,
|
|
)
|
|
if is_new:
|
|
new_this_bl += 1
|
|
except Exception:
|
|
logger.exception(
|
|
"DB-UPSERT fehlgeschlagen für %s/%s — wird übersprungen",
|
|
bl.code, getattr(doc, "drucksache", "?"),
|
|
)
|
|
|
|
bl_result.new_count = new_this_bl
|
|
|
|
await upsert_monitoring_summary(
|
|
scan_date=scan_date,
|
|
bundesland=bl.code,
|
|
total_seen=bl_result.total_seen,
|
|
new_count=bl_result.new_count,
|
|
errors=None,
|
|
)
|
|
|
|
logger.info(
|
|
"%s: %d gesehen, %d neu",
|
|
bl.code, bl_result.total_seen, bl_result.new_count,
|
|
)
|
|
result.results.append(bl_result)
|
|
|
|
result.new_total = sum(r.new_count for r in result.results)
|
|
result.total_seen = sum(r.total_seen for r in result.results)
|
|
result.estimated_cost_eur = estimate_cost_qwen_plus(result.new_total)
|
|
|
|
return result
|
|
|
|
|
|
# ─── Mail-Digest ─────────────────────────────────────────────────────────────
|
|
|
|
async def run_monitoring_digest(recipient: str) -> dict:
|
|
"""Führt daily_scan() durch und verschickt den Ergebnis-Digest per Mail.
|
|
|
|
Args:
|
|
recipient: Empfänger-Adresse (typischerweise der Admin).
|
|
|
|
Returns:
|
|
dict mit Scan-Statistiken + {"mail_sent": bool}.
|
|
"""
|
|
from .mail import send_mail
|
|
from .database import get_monitoring_new_today
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from pathlib import Path
|
|
|
|
scan_result = await daily_scan()
|
|
|
|
# Neue Drucksachen für den heutigen Tag laden
|
|
new_docs = await get_monitoring_new_today(scan_result.scan_date)
|
|
|
|
# Mail-Inhalt via Template rendern
|
|
tmpl_dir = Path(__file__).resolve().parent / "templates"
|
|
env = Environment(loader=FileSystemLoader(str(tmpl_dir)), autoescape=True)
|
|
tmpl = env.get_template("monitoring_digest.html")
|
|
|
|
html_body = tmpl.render(
|
|
scan_date=scan_result.scan_date,
|
|
new_total=scan_result.new_total,
|
|
total_seen=scan_result.total_seen,
|
|
estimated_cost_eur=scan_result.estimated_cost_eur,
|
|
results=scan_result.results,
|
|
new_docs=new_docs,
|
|
errors=scan_result.errors,
|
|
)
|
|
|
|
# Plaintext-Variante
|
|
text_body = _render_plain(scan_result, new_docs)
|
|
|
|
subject = (
|
|
f"[GWÖ-Monitor] {scan_result.scan_date} — "
|
|
f"{scan_result.new_total} neue Drucksachen"
|
|
+ (f" ({len(scan_result.errors)} Fehler)" if scan_result.errors else "")
|
|
)
|
|
|
|
mail_sent = False
|
|
try:
|
|
await send_mail(recipient, subject, text_body, html_body)
|
|
mail_sent = True
|
|
logger.info("Monitoring-Digest verschickt an %s", recipient)
|
|
except Exception:
|
|
logger.exception("Monitoring-Digest: Mail-Versand fehlgeschlagen")
|
|
|
|
return {
|
|
"scan_date": scan_result.scan_date,
|
|
"new_total": scan_result.new_total,
|
|
"total_seen": scan_result.total_seen,
|
|
"estimated_cost_eur": scan_result.estimated_cost_eur,
|
|
"error_count": len(scan_result.errors),
|
|
"mail_sent": mail_sent,
|
|
}
|
|
|
|
|
|
def _render_plain(scan_result: DailyScanResult, new_docs: list[dict]) -> str:
|
|
"""Baut den Plaintext-Part des Monitoring-Digests."""
|
|
from .config import settings
|
|
|
|
lines = [
|
|
f"GWÖ-Antragsprüfer — Monitoring-Digest {scan_result.scan_date}",
|
|
"=" * 60,
|
|
"",
|
|
f"Neue Drucksachen: {scan_result.new_total}",
|
|
f"Gesamt gesehen: {scan_result.total_seen}",
|
|
f"Kosten-Schätzung: {scan_result.estimated_cost_eur:.4f} EUR",
|
|
"",
|
|
]
|
|
|
|
if scan_result.errors:
|
|
lines.append(f"Fehler ({len(scan_result.errors)}):")
|
|
for e in scan_result.errors:
|
|
lines.append(f" • {e}")
|
|
lines.append("")
|
|
|
|
lines.append("Bundesland-Übersicht:")
|
|
for r in scan_result.results:
|
|
status = f"✓ {r.new_count} neu / {r.total_seen} gesehen"
|
|
if r.error:
|
|
status = f"✗ Fehler: {r.error[:80]}"
|
|
lines.append(f" {r.bundesland:6s} {status}")
|
|
lines.append("")
|
|
|
|
if new_docs:
|
|
lines.append(f"Neue Drucksachen ({len(new_docs)}):")
|
|
for doc in new_docs[:30]:
|
|
title = (doc.get("title") or doc.get("drucksache") or "")[:80]
|
|
bl = doc.get("bundesland", "")
|
|
drucks = doc.get("drucksache", "")
|
|
lines.append(f" [{bl}] {drucks} — {title}")
|
|
if len(new_docs) > 30:
|
|
lines.append(f" … und {len(new_docs) - 30} weitere")
|
|
lines.append("")
|
|
|
|
lines.append(f"Webapp: {settings.base_url}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# python -m app.monitoring <empfaenger@example.com>
|
|
import sys
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
to = sys.argv[1] if len(sys.argv) > 1 else "mail@tobiasroedel.de"
|
|
stats = asyncio.run(run_monitoring_digest(to))
|
|
print(f"Monitoring-Scan fertig: {stats}")
|