antragstracker/scripts/extract_pdfs.py

156 lines
4.6 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Lädt PDFs von URLs und extrahiert Text mit PyMuPDF.
Parallelisiert für Geschwindigkeit.
"""
import argparse
import sqlite3
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import httpx
import pymupdf # PyMuPDF
PROJECT_ROOT = Path(__file__).resolve().parent.parent
DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db"
# Rate limiting
REQUESTS_PER_SECOND = 5
MIN_DELAY = 1.0 / REQUESTS_PER_SECOND
def get_db():
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def download_and_extract(vorlage_id: int, url: str) -> tuple[int, str | None, str | None]:
"""
Lädt PDF und extrahiert Text.
Returns: (vorlage_id, text, error)
"""
try:
# Download
resp = httpx.get(url, timeout=60, follow_redirects=True)
resp.raise_for_status()
if len(resp.content) < 100:
return (vorlage_id, None, "PDF zu klein")
# Text extrahieren
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tmp:
tmp.write(resp.content)
tmp.flush()
doc = pymupdf.open(tmp.name)
text_parts = []
for page in doc:
text_parts.append(page.get_text())
doc.close()
text = "\n".join(text_parts).strip()
if len(text) < 50:
return (vorlage_id, None, "Kein Text extrahiert")
return (vorlage_id, text, None)
except httpx.HTTPStatusError as e:
return (vorlage_id, None, f"HTTP {e.response.status_code}")
except Exception as e:
return (vorlage_id, None, str(e)[:100])
def clean_text(text: str) -> str:
"""Bereinigt extrahierten Text."""
import re
# Mehrfache Leerzeilen reduzieren
text = re.sub(r'\n{3,}', '\n\n', text)
# Mehrfache Leerzeichen
text = re.sub(r' {2,}', ' ', text)
return text.strip()
def process_batch(vorlagen: list[dict], workers: int = 5) -> dict:
"""Verarbeitet einen Batch parallel."""
results = {"success": 0, "failed": 0, "errors": []}
conn = get_db()
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {}
for v in vorlagen:
time.sleep(MIN_DELAY) # Rate limiting
future = executor.submit(download_and_extract, v['vorlage_id'], v['url'])
futures[future] = v
for future in as_completed(futures):
v = futures[future]
vorlage_id, text, error = future.result()
if text:
clean = clean_text(text)
conn.execute("""
UPDATE vorlagen SET volltext = ?, volltext_clean = ?
WHERE id = ?
""", (text, clean, vorlage_id))
conn.execute("""
UPDATE anlagen SET downloaded = 1 WHERE vorlage_id = ?
""", (vorlage_id,))
conn.commit()
results["success"] += 1
print(f" ✓ #{vorlage_id}: {len(clean)} Zeichen")
else:
results["failed"] += 1
results["errors"].append((vorlage_id, error))
print(f" ✗ #{vorlage_id}: {error}")
conn.close()
return results
def main():
parser = argparse.ArgumentParser(description="PDF-Extraktion")
parser.add_argument("--limit", type=int, default=100, help="Max. Anzahl")
parser.add_argument("--workers", type=int, default=5, help="Parallele Downloads")
parser.add_argument("--offset", type=int, default=0, help="Start-Offset")
args = parser.parse_args()
print(f"=== PDF-Extraktion ===")
print(f"Limit: {args.limit}, Workers: {args.workers}\n")
conn = get_db()
# Vorlagen mit PDF-URL aber ohne Volltext
vorlagen = conn.execute("""
SELECT a.vorlage_id, a.url
FROM anlagen a
JOIN vorlagen v ON a.vorlage_id = v.id
WHERE a.url IS NOT NULL
AND a.downloaded = 0
AND (v.volltext_clean IS NULL OR v.volltext_clean = '')
ORDER BY v.datum_eingang DESC
LIMIT ? OFFSET ?
""", (args.limit, args.offset)).fetchall()
conn.close()
print(f"Zu verarbeiten: {len(vorlagen)}\n")
if not vorlagen:
print("Nichts zu tun!")
return
results = process_batch([dict(v) for v in vorlagen], args.workers)
print(f"\n=== Fertig ===")
print(f"Erfolgreich: {results['success']}")
print(f"Fehlgeschlagen: {results['failed']}")
if __name__ == "__main__":
main()