#!/usr/bin/env python3 """ Adaptive PDF-Extraktion mit Throttle-Detection. Startet konservativ und erhöht Geschwindigkeit bis zum Limit. Robustes Logging für Wiederaufnahme nach Abbruch. """ import argparse import json import os import sqlite3 import tempfile import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from threading import Lock import httpx import pymupdf # Netdata Metrics HTTP Endpoint (VServer) METRICS_URL = os.environ.get("METRICS_URL", "http://152.53.119.77:8127") PROJECT_ROOT = Path(__file__).resolve().parent.parent DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db" STATE_FILE = PROJECT_ROOT / "data" / "extract_state.json" LOG_FILE = PROJECT_ROOT / "data" / "extract.log" METRICS_FILE = PROJECT_ROOT / "data" / "extract_metrics.jsonl" @dataclass class AdaptiveConfig: """Adaptive Throttling-Konfiguration.""" delay: float = 0.2 # Start nahe Optimum workers: int = 4 # Start bei ~optimal-35% (basierend auf Daten: optimal ~6) min_delay: float = 0.1 # Minimaler Delay max_workers: int = 15 # Hartes Maximum success_streak: int = 0 # Erfolge in Folge streak_threshold: int = 30 # Erfolge bis Speedup cooldown_until: float = 0 # Timestamp bis Cooldown endet best_delay_per_worker: dict = field(default_factory=dict) # worker_count -> min stable delay delay_fully_explored: bool = False # True wenn delay bei aktuellem worker-level am min throughput_per_worker: dict = field(default_factory=dict) # worker_count -> best throughput saturation_detected: bool = False # True wenn mehr Workers keinen Gewinn bringen saturation_threshold: float = 0.1 # 10% Verbesserung nötig für neuen Worker @dataclass class State: """Persistenter Zustand für Wiederaufnahme.""" processed: set = field(default_factory=set) failed: dict = field(default_factory=dict) # vorlage_id -> retry_count failed_permanent: set = field(default_factory=set) started_at: str = "" last_update: str = "" stats: dict = field(default_factory=lambda: { "success": 0, "failed": 0, "retried": 0, "total": 0 }) class AdaptiveExtractor: def __init__(self, state_file: Path = STATE_FILE, notify: bool = True): self.state_file = state_file self.config = AdaptiveConfig() self.db_lock = Lock() self.log_lock = Lock() self.notify = notify self.last_notify = 0 self.notify_interval = 300 # 5 Minuten self.batch_start_time = None self.batch_metrics = [] self.state = self._load_state() # Must be after log_lock init def _load_state(self) -> State: """Lädt Zustand aus Datei oder erstellt neuen.""" if self.state_file.exists(): try: data = json.loads(self.state_file.read_text()) state = State( processed=set(data.get("processed", [])), failed=data.get("failed", {}), failed_permanent=set(data.get("failed_permanent", [])), started_at=data.get("started_at", ""), last_update=data.get("last_update", ""), stats=data.get("stats", State().stats) ) self._log(f"State geladen: {len(state.processed)} verarbeitet, {len(state.failed)} pending retries") return state except Exception as e: self._log(f"State-Laden fehlgeschlagen: {e}") return State(started_at=datetime.now().isoformat()) def _save_state(self): """Speichert Zustand.""" self.state.last_update = datetime.now().isoformat() data = { "processed": list(self.state.processed), "failed": self.state.failed, "failed_permanent": list(self.state.failed_permanent), "started_at": self.state.started_at, "last_update": self.state.last_update, "stats": self.state.stats } self.state_file.write_text(json.dumps(data, indent=2)) def _log(self, msg: str): """Thread-safe Logging.""" timestamp = datetime.now().strftime("%H:%M:%S") line = f"[{timestamp}] {msg}" print(line) with self.log_lock: with open(LOG_FILE, "a") as f: f.write(line + "\n") def _record_metric(self, batch_num: int, batch_time: float, success: int, failed: int, bytes_downloaded: int = 0): """Speichert Metriken für Visualisierung.""" mb_downloaded = bytes_downloaded / (1024 * 1024) mb_per_sec = mb_downloaded / max(batch_time, 0.1) metric = { "timestamp": datetime.now().isoformat(), "batch": batch_num, "batch_time_sec": round(batch_time, 2), "success": success, "failed": failed, "delay": round(self.config.delay, 3), "workers": self.config.workers, "throughput": round(success / max(batch_time, 0.1), 2), # docs/sec "total_success": self.state.stats["success"], "total_failed": len(self.state.failed_permanent), "pending_retries": len(self.state.failed), "mb_downloaded": round(mb_downloaded, 2), "mb_per_sec": round(mb_per_sec, 2), } self.batch_metrics.append(metric) with open(METRICS_FILE, "a") as f: f.write(json.dumps(metric) + "\n") return metric def _push_metrics(self, metric: dict): """Pusht Metriken per HTTP an VServer → Netdata Statsd.""" try: payload = { "throughput": metric["throughput"], "delay": metric["delay"], "workers": metric["workers"], "success_total": metric["total_success"], "failed_total": metric["total_failed"], "batch_time": metric["batch_time_sec"], "pending_retries": metric["pending_retries"], "items_per_sec": metric["throughput"], # Alias "mb_per_sec": metric.get("mb_per_sec", 0), "mb_downloaded": metric.get("mb_downloaded", 0), } httpx.post(METRICS_URL, json=payload, timeout=5) except Exception as e: pass # Silent fail, don't block extraction def _send_telegram(self, message: str): """Loggt Update.""" self._log(f"[NOTIFY] {message[:100]}...") def _maybe_notify(self, force: bool = False): """Sendet periodische Updates nach Telegram.""" if not self.notify: return now = time.time() if not force and (now - self.last_notify) < self.notify_interval: return self.last_notify = now # Letzte Metriken if not self.batch_metrics: return recent = self.batch_metrics[-1] elapsed = (datetime.now() - datetime.fromisoformat(self.state.started_at)).total_seconds() / 60 # Throughput-Trend (letzte 5 Batches) recent_throughputs = [m["throughput"] for m in self.batch_metrics[-5:]] avg_throughput = sum(recent_throughputs) / len(recent_throughputs) # ETA remaining = self.state.stats["total"] - self.state.stats["success"] - len(self.state.failed_permanent) eta_min = remaining / max(avg_throughput * 60, 0.1) msg = f"""📊 *PDF-Extraktion Update* ✓ Erfolg: {self.state.stats['success']:,} ✗ Fehler: {len(self.state.failed_permanent)} ↻ Retries: {len(self.state.failed)} ⚡ Config: {self.config.workers} workers, {self.config.delay:.2f}s delay 📈 Throughput: {avg_throughput:.1f} docs/sec ⏱️ Laufzeit: {elapsed:.0f} min 🎯 ETA: ~{eta_min:.0f} min Batch {recent['batch']}: {recent['success']}✓ {recent['failed']}✗ in {recent['batch_time_sec']}s""" self._send_telegram(msg) def _get_db(self): conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) conn.row_factory = sqlite3.Row return conn def _download_and_extract(self, vorlage_id: int, url: str) -> tuple[int, str | None, str | None, int]: """Lädt PDF und extrahiert Text. Returns: (vorlage_id, text, error, bytes_downloaded)""" try: resp = httpx.get(url, timeout=60, follow_redirects=True) # Throttling-Detection if resp.status_code in (429, 503): return (vorlage_id, None, f"THROTTLED:{resp.status_code}", 0) resp.raise_for_status() content_size = len(resp.content) if content_size < 100: return (vorlage_id, None, "PDF zu klein", content_size) with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tmp: tmp.write(resp.content) tmp.flush() doc = pymupdf.open(tmp.name) text_parts = [] for page in doc: text_parts.append(page.get_text()) doc.close() text = "\n".join(text_parts).strip() if len(text) < 50: return (vorlage_id, None, "Kein Text", content_size) # Bereinigen import re text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r' {2,}', ' ', text) return (vorlage_id, text, None, content_size) except httpx.HTTPStatusError as e: return (vorlage_id, None, f"HTTP:{e.response.status_code}", 0) except Exception as e: return (vorlage_id, None, str(e)[:80], 0) def _handle_success(self, vorlage_id: int, text: str): """Verarbeitet erfolgreiche Extraktion.""" with self.db_lock: conn = self._get_db() conn.execute(""" UPDATE vorlagen SET volltext = ?, volltext_clean = ? WHERE id = ? """, (text, text, vorlage_id)) conn.execute("UPDATE anlagen SET downloaded = 1 WHERE vorlage_id = ?", (vorlage_id,)) conn.commit() conn.close() self.state.processed.add(vorlage_id) self.state.stats["success"] += 1 self.config.success_streak += 1 # Adaptive Speedup if self.config.success_streak >= self.config.streak_threshold: self._speedup() self.config.success_streak = 0 def _handle_failure(self, vorlage_id: int, error: str): """Verarbeitet Fehler mit Retry-Logik.""" retry_count = self.state.failed.get(str(vorlage_id), 0) + 1 if "THROTTLED" in error: # Cooldown aktivieren self._slowdown(severe=True) self.state.failed[str(vorlage_id)] = retry_count return if retry_count >= 3: self.state.failed_permanent.add(vorlage_id) if str(vorlage_id) in self.state.failed: del self.state.failed[str(vorlage_id)] self._log(f" ✗ #{vorlage_id} permanent failed: {error}") else: self.state.failed[str(vorlage_id)] = retry_count self._log(f" ↻ #{vorlage_id} retry {retry_count}/3: {error}") self.state.stats["failed"] += 1 self.config.success_streak = 0 def _speedup(self): """Erhöht Geschwindigkeit mit Worker-Level-Exploration und Sättigungs-Erkennung.""" old_delay = self.config.delay old_workers = self.config.workers # Aktuellen Delay als stabil für diesen Worker-Level merken w = self.config.workers if w not in self.config.best_delay_per_worker: self.config.best_delay_per_worker[w] = self.config.delay else: self.config.best_delay_per_worker[w] = min( self.config.best_delay_per_worker[w], self.config.delay ) # Aktuellen Throughput für Worker-Level tracken if self.batch_metrics: recent_throughput = sum(m["throughput"] for m in self.batch_metrics[-3:]) / min(3, len(self.batch_metrics)) if w not in self.config.throughput_per_worker: self.config.throughput_per_worker[w] = recent_throughput else: # Gleitender Durchschnitt self.config.throughput_per_worker[w] = ( self.config.throughput_per_worker[w] * 0.7 + recent_throughput * 0.3 ) if self.config.delay > self.config.min_delay: # Delay noch nicht am Minimum → weiter reduzieren self.config.delay = max(self.config.min_delay, self.config.delay * 0.8) self.config.delay_fully_explored = False elif self.config.saturation_detected: # Sättigung erkannt → nicht mehr skalieren self._log(f"📊 Sättigung bei {self.config.workers} Workers — mehr bringt nichts") elif self.config.workers < self.config.max_workers: # Prüfe ob letzter Worker-Sprung Verbesserung gebracht hat prev_throughput = self.config.throughput_per_worker.get(w - 1, 0) curr_throughput = self.config.throughput_per_worker.get(w, 0) if prev_throughput > 0 and curr_throughput > 0: improvement = (curr_throughput - prev_throughput) / prev_throughput if improvement < self.config.saturation_threshold: # Weniger als 10% Verbesserung → Sättigung self.config.saturation_detected = True self._log(f"📊 Sättigung erkannt: {w-1}→{w} Workers nur +{improvement*100:.1f}% Throughput") return # Worker hinzufügen self.config.workers += 1 prev_best = self.config.best_delay_per_worker.get(w, 0.5) self.config.delay = max(prev_best, 0.3) self.config.delay_fully_explored = False self._log(f"🔄 Neuer Worker-Level: reset delay auf {self.config.delay:.2f}s für Exploration") else: self.config.delay_fully_explored = True if old_delay != self.config.delay or old_workers != self.config.workers: self._log(f"⚡ Speedup: delay={self.config.delay:.2f}s, workers={self.config.workers}") def _slowdown(self, severe: bool = False): """Verlangsamt bei Problemen.""" if severe: self.config.cooldown_until = time.time() + 30 # 30s Pause self.config.delay = min(2.0, self.config.delay * 2) self.config.workers = max(1, self.config.workers - 1) self.config.delay_fully_explored = False # Exploration resetten self._log(f"🛑 Throttled! Cooldown 30s, delay={self.config.delay:.2f}s, workers={self.config.workers}") else: self.config.delay = min(2.0, self.config.delay * 1.2) self._log(f"⚠️ Slowdown: delay={self.config.delay:.2f}s") def _wait_cooldown(self): """Wartet Cooldown ab.""" if self.config.cooldown_until > time.time(): wait = self.config.cooldown_until - time.time() self._log(f"⏳ Cooldown: {wait:.0f}s warten...") time.sleep(wait) def get_pending(self, limit: int) -> list[dict]: """Holt zu verarbeitende Vorlagen.""" conn = self._get_db() # Alle mit URL aber ohne Volltext, die nicht schon verarbeitet sind processed_ids = self.state.processed | self.state.failed_permanent query = """ SELECT a.vorlage_id, a.url FROM anlagen a JOIN vorlagen v ON a.vorlage_id = v.id WHERE a.url IS NOT NULL AND a.downloaded = 0 AND (v.volltext_clean IS NULL OR v.volltext_clean = '') ORDER BY v.datum_eingang DESC """ all_pending = conn.execute(query).fetchall() conn.close() # Filtern result = [] for row in all_pending: if row['vorlage_id'] not in processed_ids: result.append(dict(row)) if len(result) >= limit: break # Retries hinzufügen (am Ende) for vid_str, count in list(self.state.failed.items()): if len(result) >= limit: break vid = int(vid_str) # URL nochmal holen conn = self._get_db() row = conn.execute("SELECT vorlage_id, url FROM anlagen WHERE vorlage_id = ?", (vid,)).fetchone() conn.close() if row: result.append(dict(row)) self.state.stats["retried"] += 1 return result def run(self, limit: int = 1000): """Hauptschleife.""" self._log(f"=== Adaptive Extraktion gestartet ===") self._log(f"Limit: {limit}, Start-Config: delay={self.config.delay}s, workers={self.config.workers}") pending = self.get_pending(limit) self.state.stats["total"] = len(pending) self._log(f"Zu verarbeiten: {len(pending)}") if not pending: self._log("Nichts zu tun!") return batch_size = 50 processed_count = 0 batch_num = 0 for i in range(0, len(pending), batch_size): self._wait_cooldown() batch = pending[i:i+batch_size] batch_num += 1 batch_start = time.time() batch_success = 0 batch_failed = 0 batch_bytes = 0 self._log(f"\n--- Batch {batch_num}: {len(batch)} Vorlagen ---") self._log(f"Config: delay={self.config.delay:.2f}s, workers={self.config.workers}") with ThreadPoolExecutor(max_workers=self.config.workers) as executor: futures = {} for item in batch: time.sleep(self.config.delay) future = executor.submit( self._download_and_extract, item['vorlage_id'], item['url'] ) futures[future] = item for future in as_completed(futures): vorlage_id, text, error, bytes_dl = future.result() batch_bytes += bytes_dl if text: self._handle_success(vorlage_id, text) self._log(f" ✓ #{vorlage_id}: {len(text)} Zeichen") batch_success += 1 else: self._handle_failure(vorlage_id, error) batch_failed += 1 processed_count += 1 # Metriken aufzeichnen batch_time = time.time() - batch_start metric = self._record_metric(batch_num, batch_time, batch_success, batch_failed, batch_bytes) # Push to Netdata Statsd self._push_metrics(metric) # State speichern nach jedem Batch self._save_state() # Fortschritt stats = self.state.stats self._log(f"Progress: {processed_count}/{len(pending)} | ✓{stats['success']} ✗{len(self.state.failed_permanent)} | {batch_success/max(batch_time,0.1):.1f} docs/sec") # Telegram-Update (alle 5 min) self._maybe_notify() self._log(f"\n=== Fertig ===") self._log(f"Erfolgreich: {self.state.stats['success']}") self._log(f"Fehlgeschlagen: {len(self.state.failed_permanent)}") self._log(f"State gespeichert: {self.state_file}") # Finale Notification self._maybe_notify(force=True) if self.notify: self._send_telegram(f"✅ *PDF-Extraktion abgeschlossen*\n\n✓ {self.state.stats['success']:,} erfolgreich\n✗ {len(self.state.failed_permanent)} fehlgeschlagen") def main(): parser = argparse.ArgumentParser(description="Adaptive PDF-Extraktion") parser.add_argument("--limit", type=int, default=1000, help="Max. Anzahl") parser.add_argument("--reset", action="store_true", help="State zurücksetzen") parser.add_argument("--no-notify", action="store_true", help="Keine Telegram-Updates") parser.add_argument("--notify-interval", type=int, default=300, help="Sekunden zwischen Updates") args = parser.parse_args() if args.reset and STATE_FILE.exists(): STATE_FILE.unlink() print("State zurückgesetzt") # Metriken-Datei leeren bei Reset if args.reset and METRICS_FILE.exists(): METRICS_FILE.unlink() extractor = AdaptiveExtractor(notify=not args.no_notify) extractor.notify_interval = args.notify_interval extractor.run(limit=args.limit) if __name__ == "__main__": main()