#!/usr/bin/env python3 """ Lädt PDFs von URLs und extrahiert Text mit PyMuPDF. Parallelisiert für Geschwindigkeit. """ import argparse import sqlite3 import tempfile import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path import httpx import pymupdf # PyMuPDF PROJECT_ROOT = Path(__file__).resolve().parent.parent DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db" # Rate limiting REQUESTS_PER_SECOND = 5 MIN_DELAY = 1.0 / REQUESTS_PER_SECOND def get_db(): conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) conn.row_factory = sqlite3.Row return conn def download_and_extract(vorlage_id: int, url: str) -> tuple[int, str | None, str | None]: """ Lädt PDF und extrahiert Text. Returns: (vorlage_id, text, error) """ try: # Download resp = httpx.get(url, timeout=60, follow_redirects=True) resp.raise_for_status() if len(resp.content) < 100: return (vorlage_id, None, "PDF zu klein") # Text extrahieren with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tmp: tmp.write(resp.content) tmp.flush() doc = pymupdf.open(tmp.name) text_parts = [] for page in doc: text_parts.append(page.get_text()) doc.close() text = "\n".join(text_parts).strip() if len(text) < 50: return (vorlage_id, None, "Kein Text extrahiert") return (vorlage_id, text, None) except httpx.HTTPStatusError as e: return (vorlage_id, None, f"HTTP {e.response.status_code}") except Exception as e: return (vorlage_id, None, str(e)[:100]) def clean_text(text: str) -> str: """Bereinigt extrahierten Text.""" import re # Mehrfache Leerzeilen reduzieren text = re.sub(r'\n{3,}', '\n\n', text) # Mehrfache Leerzeichen text = re.sub(r' {2,}', ' ', text) return text.strip() def process_batch(vorlagen: list[dict], workers: int = 5) -> dict: """Verarbeitet einen Batch parallel.""" results = {"success": 0, "failed": 0, "errors": []} conn = get_db() with ThreadPoolExecutor(max_workers=workers) as executor: futures = {} for v in vorlagen: time.sleep(MIN_DELAY) # Rate limiting future = executor.submit(download_and_extract, v['vorlage_id'], v['url']) futures[future] = v for future in as_completed(futures): v = futures[future] vorlage_id, text, error = future.result() if text: clean = clean_text(text) conn.execute(""" UPDATE vorlagen SET volltext = ?, volltext_clean = ? WHERE id = ? """, (text, clean, vorlage_id)) conn.execute(""" UPDATE anlagen SET downloaded = 1 WHERE vorlage_id = ? """, (vorlage_id,)) conn.commit() results["success"] += 1 print(f" ✓ #{vorlage_id}: {len(clean)} Zeichen") else: results["failed"] += 1 results["errors"].append((vorlage_id, error)) print(f" ✗ #{vorlage_id}: {error}") conn.close() return results def main(): parser = argparse.ArgumentParser(description="PDF-Extraktion") parser.add_argument("--limit", type=int, default=100, help="Max. Anzahl") parser.add_argument("--workers", type=int, default=5, help="Parallele Downloads") parser.add_argument("--offset", type=int, default=0, help="Start-Offset") args = parser.parse_args() print(f"=== PDF-Extraktion ===") print(f"Limit: {args.limit}, Workers: {args.workers}\n") conn = get_db() # Vorlagen mit PDF-URL aber ohne Volltext vorlagen = conn.execute(""" SELECT a.vorlage_id, a.url FROM anlagen a JOIN vorlagen v ON a.vorlage_id = v.id WHERE a.url IS NOT NULL AND a.downloaded = 0 AND (v.volltext_clean IS NULL OR v.volltext_clean = '') ORDER BY v.datum_eingang DESC LIMIT ? OFFSET ? """, (args.limit, args.offset)).fetchall() conn.close() print(f"Zu verarbeiten: {len(vorlagen)}\n") if not vorlagen: print("Nichts zu tun!") return results = process_batch([dict(v) for v in vorlagen], args.workers) print(f"\n=== Fertig ===") print(f"Erfolgreich: {results['success']}") print(f"Fehlgeschlagen: {results['failed']}") if __name__ == "__main__": main()