antragstracker/scripts/extract_adaptive.py
Dotty Dotter 17606ab237 feat: Initial commit — Antragstracker Hagen
Vollständige Pipeline zur Analyse kommunaler Vorlagen aus ALLRIS:
- OParl-Import: 20.149 Vorlagen
- PDF-Extraktion: 10.045 Volltexte (adaptives Throttling)
- KI-Zusammenfassungen: 10.026 via Qwen Plus (parallelisiert)
- Beratungsfolge-Scraper: Beschlusstexte + Wortprotokolle
- Abstimmungs-Analyse mit Koalitionsmatrix
- Georeferenzierung (Nominatim)

Stack: FastAPI + SvelteKit + SQLite
Deployment: Docker + Traefik auf VServer

Daten (DB, Logs) nicht im Repo — siehe Restic-Backup.
Repo-Setup: scripts/setup.sh für Neuaufbau aus OParl-API.
2026-03-30 16:37:58 +02:00

528 lines
21 KiB
Python

#!/usr/bin/env python3
"""
Adaptive PDF-Extraktion mit Throttle-Detection.
Startet konservativ und erhöht Geschwindigkeit bis zum Limit.
Robustes Logging für Wiederaufnahme nach Abbruch.
"""
import argparse
import json
import os
import sqlite3
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from threading import Lock
import httpx
import pymupdf
# Netdata Metrics HTTP Endpoint (VServer)
METRICS_URL = os.environ.get("METRICS_URL", "http://152.53.119.77:8127")
PROJECT_ROOT = Path(__file__).resolve().parent.parent
DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db"
STATE_FILE = PROJECT_ROOT / "data" / "extract_state.json"
LOG_FILE = PROJECT_ROOT / "data" / "extract.log"
METRICS_FILE = PROJECT_ROOT / "data" / "extract_metrics.jsonl"
@dataclass
class AdaptiveConfig:
"""Adaptive Throttling-Konfiguration."""
delay: float = 0.2 # Start nahe Optimum
workers: int = 4 # Start bei ~optimal-35% (basierend auf Daten: optimal ~6)
min_delay: float = 0.1 # Minimaler Delay
max_workers: int = 15 # Hartes Maximum
success_streak: int = 0 # Erfolge in Folge
streak_threshold: int = 30 # Erfolge bis Speedup
cooldown_until: float = 0 # Timestamp bis Cooldown endet
best_delay_per_worker: dict = field(default_factory=dict) # worker_count -> min stable delay
delay_fully_explored: bool = False # True wenn delay bei aktuellem worker-level am min
throughput_per_worker: dict = field(default_factory=dict) # worker_count -> best throughput
saturation_detected: bool = False # True wenn mehr Workers keinen Gewinn bringen
saturation_threshold: float = 0.1 # 10% Verbesserung nötig für neuen Worker
@dataclass
class State:
"""Persistenter Zustand für Wiederaufnahme."""
processed: set = field(default_factory=set)
failed: dict = field(default_factory=dict) # vorlage_id -> retry_count
failed_permanent: set = field(default_factory=set)
started_at: str = ""
last_update: str = ""
stats: dict = field(default_factory=lambda: {
"success": 0, "failed": 0, "retried": 0, "total": 0
})
class AdaptiveExtractor:
def __init__(self, state_file: Path = STATE_FILE, notify: bool = True):
self.state_file = state_file
self.config = AdaptiveConfig()
self.db_lock = Lock()
self.log_lock = Lock()
self.notify = notify
self.last_notify = 0
self.notify_interval = 300 # 5 Minuten
self.batch_start_time = None
self.batch_metrics = []
self.state = self._load_state() # Must be after log_lock init
def _load_state(self) -> State:
"""Lädt Zustand aus Datei oder erstellt neuen."""
if self.state_file.exists():
try:
data = json.loads(self.state_file.read_text())
state = State(
processed=set(data.get("processed", [])),
failed=data.get("failed", {}),
failed_permanent=set(data.get("failed_permanent", [])),
started_at=data.get("started_at", ""),
last_update=data.get("last_update", ""),
stats=data.get("stats", State().stats)
)
self._log(f"State geladen: {len(state.processed)} verarbeitet, {len(state.failed)} pending retries")
return state
except Exception as e:
self._log(f"State-Laden fehlgeschlagen: {e}")
return State(started_at=datetime.now().isoformat())
def _save_state(self):
"""Speichert Zustand."""
self.state.last_update = datetime.now().isoformat()
data = {
"processed": list(self.state.processed),
"failed": self.state.failed,
"failed_permanent": list(self.state.failed_permanent),
"started_at": self.state.started_at,
"last_update": self.state.last_update,
"stats": self.state.stats
}
self.state_file.write_text(json.dumps(data, indent=2))
def _log(self, msg: str):
"""Thread-safe Logging."""
timestamp = datetime.now().strftime("%H:%M:%S")
line = f"[{timestamp}] {msg}"
print(line)
with self.log_lock:
with open(LOG_FILE, "a") as f:
f.write(line + "\n")
def _record_metric(self, batch_num: int, batch_time: float, success: int, failed: int, bytes_downloaded: int = 0):
"""Speichert Metriken für Visualisierung."""
mb_downloaded = bytes_downloaded / (1024 * 1024)
mb_per_sec = mb_downloaded / max(batch_time, 0.1)
metric = {
"timestamp": datetime.now().isoformat(),
"batch": batch_num,
"batch_time_sec": round(batch_time, 2),
"success": success,
"failed": failed,
"delay": round(self.config.delay, 3),
"workers": self.config.workers,
"throughput": round(success / max(batch_time, 0.1), 2), # docs/sec
"total_success": self.state.stats["success"],
"total_failed": len(self.state.failed_permanent),
"pending_retries": len(self.state.failed),
"mb_downloaded": round(mb_downloaded, 2),
"mb_per_sec": round(mb_per_sec, 2),
}
self.batch_metrics.append(metric)
with open(METRICS_FILE, "a") as f:
f.write(json.dumps(metric) + "\n")
return metric
def _push_metrics(self, metric: dict):
"""Pusht Metriken per HTTP an VServer → Netdata Statsd."""
try:
payload = {
"throughput": metric["throughput"],
"delay": metric["delay"],
"workers": metric["workers"],
"success_total": metric["total_success"],
"failed_total": metric["total_failed"],
"batch_time": metric["batch_time_sec"],
"pending_retries": metric["pending_retries"],
"items_per_sec": metric["throughput"], # Alias
"mb_per_sec": metric.get("mb_per_sec", 0),
"mb_downloaded": metric.get("mb_downloaded", 0),
}
httpx.post(METRICS_URL, json=payload, timeout=5)
except Exception as e:
pass # Silent fail, don't block extraction
def _send_telegram(self, message: str):
"""Loggt Update."""
self._log(f"[NOTIFY] {message[:100]}...")
def _maybe_notify(self, force: bool = False):
"""Sendet periodische Updates nach Telegram."""
if not self.notify:
return
now = time.time()
if not force and (now - self.last_notify) < self.notify_interval:
return
self.last_notify = now
# Letzte Metriken
if not self.batch_metrics:
return
recent = self.batch_metrics[-1]
elapsed = (datetime.now() - datetime.fromisoformat(self.state.started_at)).total_seconds() / 60
# Throughput-Trend (letzte 5 Batches)
recent_throughputs = [m["throughput"] for m in self.batch_metrics[-5:]]
avg_throughput = sum(recent_throughputs) / len(recent_throughputs)
# ETA
remaining = self.state.stats["total"] - self.state.stats["success"] - len(self.state.failed_permanent)
eta_min = remaining / max(avg_throughput * 60, 0.1)
msg = f"""📊 *PDF-Extraktion Update*
✓ Erfolg: {self.state.stats['success']:,}
✗ Fehler: {len(self.state.failed_permanent)}
↻ Retries: {len(self.state.failed)}
⚡ Config: {self.config.workers} workers, {self.config.delay:.2f}s delay
📈 Throughput: {avg_throughput:.1f} docs/sec
⏱️ Laufzeit: {elapsed:.0f} min
🎯 ETA: ~{eta_min:.0f} min
Batch {recent['batch']}: {recent['success']}{recent['failed']}✗ in {recent['batch_time_sec']}s"""
self._send_telegram(msg)
def _get_db(self):
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def _download_and_extract(self, vorlage_id: int, url: str) -> tuple[int, str | None, str | None, int]:
"""Lädt PDF und extrahiert Text. Returns: (vorlage_id, text, error, bytes_downloaded)"""
try:
resp = httpx.get(url, timeout=60, follow_redirects=True)
# Throttling-Detection
if resp.status_code in (429, 503):
return (vorlage_id, None, f"THROTTLED:{resp.status_code}", 0)
resp.raise_for_status()
content_size = len(resp.content)
if content_size < 100:
return (vorlage_id, None, "PDF zu klein", content_size)
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tmp:
tmp.write(resp.content)
tmp.flush()
doc = pymupdf.open(tmp.name)
text_parts = []
for page in doc:
text_parts.append(page.get_text())
doc.close()
text = "\n".join(text_parts).strip()
if len(text) < 50:
return (vorlage_id, None, "Kein Text", content_size)
# Bereinigen
import re
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r' {2,}', ' ', text)
return (vorlage_id, text, None, content_size)
except httpx.HTTPStatusError as e:
return (vorlage_id, None, f"HTTP:{e.response.status_code}", 0)
except Exception as e:
return (vorlage_id, None, str(e)[:80], 0)
def _handle_success(self, vorlage_id: int, text: str):
"""Verarbeitet erfolgreiche Extraktion."""
with self.db_lock:
conn = self._get_db()
conn.execute("""
UPDATE vorlagen SET volltext = ?, volltext_clean = ?
WHERE id = ?
""", (text, text, vorlage_id))
conn.execute("UPDATE anlagen SET downloaded = 1 WHERE vorlage_id = ?", (vorlage_id,))
conn.commit()
conn.close()
self.state.processed.add(vorlage_id)
self.state.stats["success"] += 1
self.config.success_streak += 1
# Adaptive Speedup
if self.config.success_streak >= self.config.streak_threshold:
self._speedup()
self.config.success_streak = 0
def _handle_failure(self, vorlage_id: int, error: str):
"""Verarbeitet Fehler mit Retry-Logik."""
retry_count = self.state.failed.get(str(vorlage_id), 0) + 1
if "THROTTLED" in error:
# Cooldown aktivieren
self._slowdown(severe=True)
self.state.failed[str(vorlage_id)] = retry_count
return
if retry_count >= 3:
self.state.failed_permanent.add(vorlage_id)
if str(vorlage_id) in self.state.failed:
del self.state.failed[str(vorlage_id)]
self._log(f" ✗ #{vorlage_id} permanent failed: {error}")
else:
self.state.failed[str(vorlage_id)] = retry_count
self._log(f" ↻ #{vorlage_id} retry {retry_count}/3: {error}")
self.state.stats["failed"] += 1
self.config.success_streak = 0
def _speedup(self):
"""Erhöht Geschwindigkeit mit Worker-Level-Exploration und Sättigungs-Erkennung."""
old_delay = self.config.delay
old_workers = self.config.workers
# Aktuellen Delay als stabil für diesen Worker-Level merken
w = self.config.workers
if w not in self.config.best_delay_per_worker:
self.config.best_delay_per_worker[w] = self.config.delay
else:
self.config.best_delay_per_worker[w] = min(
self.config.best_delay_per_worker[w],
self.config.delay
)
# Aktuellen Throughput für Worker-Level tracken
if self.batch_metrics:
recent_throughput = sum(m["throughput"] for m in self.batch_metrics[-3:]) / min(3, len(self.batch_metrics))
if w not in self.config.throughput_per_worker:
self.config.throughput_per_worker[w] = recent_throughput
else:
# Gleitender Durchschnitt
self.config.throughput_per_worker[w] = (
self.config.throughput_per_worker[w] * 0.7 + recent_throughput * 0.3
)
if self.config.delay > self.config.min_delay:
# Delay noch nicht am Minimum → weiter reduzieren
self.config.delay = max(self.config.min_delay, self.config.delay * 0.8)
self.config.delay_fully_explored = False
elif self.config.saturation_detected:
# Sättigung erkannt → nicht mehr skalieren
self._log(f"📊 Sättigung bei {self.config.workers} Workers — mehr bringt nichts")
elif self.config.workers < self.config.max_workers:
# Prüfe ob letzter Worker-Sprung Verbesserung gebracht hat
prev_throughput = self.config.throughput_per_worker.get(w - 1, 0)
curr_throughput = self.config.throughput_per_worker.get(w, 0)
if prev_throughput > 0 and curr_throughput > 0:
improvement = (curr_throughput - prev_throughput) / prev_throughput
if improvement < self.config.saturation_threshold:
# Weniger als 10% Verbesserung → Sättigung
self.config.saturation_detected = True
self._log(f"📊 Sättigung erkannt: {w-1}{w} Workers nur +{improvement*100:.1f}% Throughput")
return
# Worker hinzufügen
self.config.workers += 1
prev_best = self.config.best_delay_per_worker.get(w, 0.5)
self.config.delay = max(prev_best, 0.3)
self.config.delay_fully_explored = False
self._log(f"🔄 Neuer Worker-Level: reset delay auf {self.config.delay:.2f}s für Exploration")
else:
self.config.delay_fully_explored = True
if old_delay != self.config.delay or old_workers != self.config.workers:
self._log(f"⚡ Speedup: delay={self.config.delay:.2f}s, workers={self.config.workers}")
def _slowdown(self, severe: bool = False):
"""Verlangsamt bei Problemen."""
if severe:
self.config.cooldown_until = time.time() + 30 # 30s Pause
self.config.delay = min(2.0, self.config.delay * 2)
self.config.workers = max(1, self.config.workers - 1)
self.config.delay_fully_explored = False # Exploration resetten
self._log(f"🛑 Throttled! Cooldown 30s, delay={self.config.delay:.2f}s, workers={self.config.workers}")
else:
self.config.delay = min(2.0, self.config.delay * 1.2)
self._log(f"⚠️ Slowdown: delay={self.config.delay:.2f}s")
def _wait_cooldown(self):
"""Wartet Cooldown ab."""
if self.config.cooldown_until > time.time():
wait = self.config.cooldown_until - time.time()
self._log(f"⏳ Cooldown: {wait:.0f}s warten...")
time.sleep(wait)
def get_pending(self, limit: int) -> list[dict]:
"""Holt zu verarbeitende Vorlagen."""
conn = self._get_db()
# Alle mit URL aber ohne Volltext, die nicht schon verarbeitet sind
processed_ids = self.state.processed | self.state.failed_permanent
query = """
SELECT a.vorlage_id, a.url
FROM anlagen a
JOIN vorlagen v ON a.vorlage_id = v.id
WHERE a.url IS NOT NULL
AND a.downloaded = 0
AND (v.volltext_clean IS NULL OR v.volltext_clean = '')
ORDER BY v.datum_eingang DESC
"""
all_pending = conn.execute(query).fetchall()
conn.close()
# Filtern
result = []
for row in all_pending:
if row['vorlage_id'] not in processed_ids:
result.append(dict(row))
if len(result) >= limit:
break
# Retries hinzufügen (am Ende)
for vid_str, count in list(self.state.failed.items()):
if len(result) >= limit:
break
vid = int(vid_str)
# URL nochmal holen
conn = self._get_db()
row = conn.execute("SELECT vorlage_id, url FROM anlagen WHERE vorlage_id = ?", (vid,)).fetchone()
conn.close()
if row:
result.append(dict(row))
self.state.stats["retried"] += 1
return result
def run(self, limit: int = 1000):
"""Hauptschleife."""
self._log(f"=== Adaptive Extraktion gestartet ===")
self._log(f"Limit: {limit}, Start-Config: delay={self.config.delay}s, workers={self.config.workers}")
pending = self.get_pending(limit)
self.state.stats["total"] = len(pending)
self._log(f"Zu verarbeiten: {len(pending)}")
if not pending:
self._log("Nichts zu tun!")
return
batch_size = 50
processed_count = 0
batch_num = 0
for i in range(0, len(pending), batch_size):
self._wait_cooldown()
batch = pending[i:i+batch_size]
batch_num += 1
batch_start = time.time()
batch_success = 0
batch_failed = 0
batch_bytes = 0
self._log(f"\n--- Batch {batch_num}: {len(batch)} Vorlagen ---")
self._log(f"Config: delay={self.config.delay:.2f}s, workers={self.config.workers}")
with ThreadPoolExecutor(max_workers=self.config.workers) as executor:
futures = {}
for item in batch:
time.sleep(self.config.delay)
future = executor.submit(
self._download_and_extract,
item['vorlage_id'],
item['url']
)
futures[future] = item
for future in as_completed(futures):
vorlage_id, text, error, bytes_dl = future.result()
batch_bytes += bytes_dl
if text:
self._handle_success(vorlage_id, text)
self._log(f" ✓ #{vorlage_id}: {len(text)} Zeichen")
batch_success += 1
else:
self._handle_failure(vorlage_id, error)
batch_failed += 1
processed_count += 1
# Metriken aufzeichnen
batch_time = time.time() - batch_start
metric = self._record_metric(batch_num, batch_time, batch_success, batch_failed, batch_bytes)
# Push to Netdata Statsd
self._push_metrics(metric)
# State speichern nach jedem Batch
self._save_state()
# Fortschritt
stats = self.state.stats
self._log(f"Progress: {processed_count}/{len(pending)} | ✓{stats['success']}{len(self.state.failed_permanent)} | {batch_success/max(batch_time,0.1):.1f} docs/sec")
# Telegram-Update (alle 5 min)
self._maybe_notify()
self._log(f"\n=== Fertig ===")
self._log(f"Erfolgreich: {self.state.stats['success']}")
self._log(f"Fehlgeschlagen: {len(self.state.failed_permanent)}")
self._log(f"State gespeichert: {self.state_file}")
# Finale Notification
self._maybe_notify(force=True)
if self.notify:
self._send_telegram(f"✅ *PDF-Extraktion abgeschlossen*\n\n{self.state.stats['success']:,} erfolgreich\n{len(self.state.failed_permanent)} fehlgeschlagen")
def main():
parser = argparse.ArgumentParser(description="Adaptive PDF-Extraktion")
parser.add_argument("--limit", type=int, default=1000, help="Max. Anzahl")
parser.add_argument("--reset", action="store_true", help="State zurücksetzen")
parser.add_argument("--no-notify", action="store_true", help="Keine Telegram-Updates")
parser.add_argument("--notify-interval", type=int, default=300, help="Sekunden zwischen Updates")
args = parser.parse_args()
if args.reset and STATE_FILE.exists():
STATE_FILE.unlink()
print("State zurückgesetzt")
# Metriken-Datei leeren bei Reset
if args.reset and METRICS_FILE.exists():
METRICS_FILE.unlink()
extractor = AdaptiveExtractor(notify=not args.no_notify)
extractor.notify_interval = args.notify_interval
extractor.run(limit=args.limit)
if __name__ == "__main__":
main()