Vollständige Pipeline zur Analyse kommunaler Vorlagen aus ALLRIS: - OParl-Import: 20.149 Vorlagen - PDF-Extraktion: 10.045 Volltexte (adaptives Throttling) - KI-Zusammenfassungen: 10.026 via Qwen Plus (parallelisiert) - Beratungsfolge-Scraper: Beschlusstexte + Wortprotokolle - Abstimmungs-Analyse mit Koalitionsmatrix - Georeferenzierung (Nominatim) Stack: FastAPI + SvelteKit + SQLite Deployment: Docker + Traefik auf VServer Daten (DB, Logs) nicht im Repo — siehe Restic-Backup. Repo-Setup: scripts/setup.sh für Neuaufbau aus OParl-API.
156 lines
4.6 KiB
Python
156 lines
4.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Lädt PDFs von URLs und extrahiert Text mit PyMuPDF.
|
|
Parallelisiert für Geschwindigkeit.
|
|
"""
|
|
|
|
import argparse
|
|
import sqlite3
|
|
import tempfile
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
import pymupdf # PyMuPDF
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
|
DB_PATH = PROJECT_ROOT / "data" / "tracker_remote.db"
|
|
|
|
# Rate limiting
|
|
REQUESTS_PER_SECOND = 5
|
|
MIN_DELAY = 1.0 / REQUESTS_PER_SECOND
|
|
|
|
|
|
def get_db():
|
|
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def download_and_extract(vorlage_id: int, url: str) -> tuple[int, str | None, str | None]:
|
|
"""
|
|
Lädt PDF und extrahiert Text.
|
|
Returns: (vorlage_id, text, error)
|
|
"""
|
|
try:
|
|
# Download
|
|
resp = httpx.get(url, timeout=60, follow_redirects=True)
|
|
resp.raise_for_status()
|
|
|
|
if len(resp.content) < 100:
|
|
return (vorlage_id, None, "PDF zu klein")
|
|
|
|
# Text extrahieren
|
|
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tmp:
|
|
tmp.write(resp.content)
|
|
tmp.flush()
|
|
|
|
doc = pymupdf.open(tmp.name)
|
|
text_parts = []
|
|
for page in doc:
|
|
text_parts.append(page.get_text())
|
|
doc.close()
|
|
|
|
text = "\n".join(text_parts).strip()
|
|
|
|
if len(text) < 50:
|
|
return (vorlage_id, None, "Kein Text extrahiert")
|
|
|
|
return (vorlage_id, text, None)
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
return (vorlage_id, None, f"HTTP {e.response.status_code}")
|
|
except Exception as e:
|
|
return (vorlage_id, None, str(e)[:100])
|
|
|
|
|
|
def clean_text(text: str) -> str:
|
|
"""Bereinigt extrahierten Text."""
|
|
import re
|
|
# Mehrfache Leerzeilen reduzieren
|
|
text = re.sub(r'\n{3,}', '\n\n', text)
|
|
# Mehrfache Leerzeichen
|
|
text = re.sub(r' {2,}', ' ', text)
|
|
return text.strip()
|
|
|
|
|
|
def process_batch(vorlagen: list[dict], workers: int = 5) -> dict:
|
|
"""Verarbeitet einen Batch parallel."""
|
|
results = {"success": 0, "failed": 0, "errors": []}
|
|
conn = get_db()
|
|
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {}
|
|
for v in vorlagen:
|
|
time.sleep(MIN_DELAY) # Rate limiting
|
|
future = executor.submit(download_and_extract, v['vorlage_id'], v['url'])
|
|
futures[future] = v
|
|
|
|
for future in as_completed(futures):
|
|
v = futures[future]
|
|
vorlage_id, text, error = future.result()
|
|
|
|
if text:
|
|
clean = clean_text(text)
|
|
conn.execute("""
|
|
UPDATE vorlagen SET volltext = ?, volltext_clean = ?
|
|
WHERE id = ?
|
|
""", (text, clean, vorlage_id))
|
|
conn.execute("""
|
|
UPDATE anlagen SET downloaded = 1 WHERE vorlage_id = ?
|
|
""", (vorlage_id,))
|
|
conn.commit()
|
|
results["success"] += 1
|
|
print(f" ✓ #{vorlage_id}: {len(clean)} Zeichen")
|
|
else:
|
|
results["failed"] += 1
|
|
results["errors"].append((vorlage_id, error))
|
|
print(f" ✗ #{vorlage_id}: {error}")
|
|
|
|
conn.close()
|
|
return results
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="PDF-Extraktion")
|
|
parser.add_argument("--limit", type=int, default=100, help="Max. Anzahl")
|
|
parser.add_argument("--workers", type=int, default=5, help="Parallele Downloads")
|
|
parser.add_argument("--offset", type=int, default=0, help="Start-Offset")
|
|
args = parser.parse_args()
|
|
|
|
print(f"=== PDF-Extraktion ===")
|
|
print(f"Limit: {args.limit}, Workers: {args.workers}\n")
|
|
|
|
conn = get_db()
|
|
|
|
# Vorlagen mit PDF-URL aber ohne Volltext
|
|
vorlagen = conn.execute("""
|
|
SELECT a.vorlage_id, a.url
|
|
FROM anlagen a
|
|
JOIN vorlagen v ON a.vorlage_id = v.id
|
|
WHERE a.url IS NOT NULL
|
|
AND a.downloaded = 0
|
|
AND (v.volltext_clean IS NULL OR v.volltext_clean = '')
|
|
ORDER BY v.datum_eingang DESC
|
|
LIMIT ? OFFSET ?
|
|
""", (args.limit, args.offset)).fetchall()
|
|
|
|
conn.close()
|
|
|
|
print(f"Zu verarbeiten: {len(vorlagen)}\n")
|
|
|
|
if not vorlagen:
|
|
print("Nichts zu tun!")
|
|
return
|
|
|
|
results = process_batch([dict(v) for v in vorlagen], args.workers)
|
|
|
|
print(f"\n=== Fertig ===")
|
|
print(f"Erfolgreich: {results['success']}")
|
|
print(f"Fehlgeschlagen: {results['failed']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|