feat(#106): Ingest-CLI fuer NRW-Plenarprotokolle

app/ingest_votes_nrw.py: Pipeline PDF → protokoll_parser_nrw → DB.

CLI:
  python -m app.ingest_votes_nrw --pdf /pfad/MMP18-119.pdf
  python -m app.ingest_votes_nrw --url https://landtag.nrw.de/.../MMP18-119.pdf
  python -m app.ingest_votes_nrw --pdf x.pdf --protokoll-id MMP18-119 --bundesland NRW

Protokoll-ID wird default aus Datei-Stem abgeleitet (MMP18-119.pdf →
MMP18-119), URL-Mode parst sie aus dem letzten Pfadsegment.

ingest_pdf() ist die programmatische API (auch fuer Folge-Cron, falls
spaeter automatisch Plenarprotokoll-Sammelinges nachgeruestet wird).
Statistik-Dict: parsed/written/skipped_no_drucksache/errors.

6 Tests: Roundtrip, skip-bei-fehlender-Drucksache, default + override
fuer Protokoll-ID, BL-Override (fuer #126-Folge), idempotenter Re-Ingest.
This commit is contained in:
Dotty Dotter 2026-04-28 08:03:18 +02:00
parent ae3f48be41
commit e26607854f
2 changed files with 313 additions and 0 deletions

153
app/ingest_votes_nrw.py Normal file
View File

@ -0,0 +1,153 @@
"""Ingest-CLI fuer NRW-Plenarprotokolle (#106).
Pipeline:
1. PDF laden (Pfad oder URL)
2. protokoll_parser_nrw.parse_protocol() liefert Liste von Abstimmungen
3. upsert_plenum_vote() schreibt jede Abstimmung in die DB
CLI:
python -m app.ingest_votes_nrw --pdf /pfad/zu/MMP18-119.pdf
python -m app.ingest_votes_nrw --url https://landtag.nrw.de/.../MMP18-119.pdf
python -m app.ingest_votes_nrw --pdf MMP18-119.pdf --protokoll-id MMP18-119
Die Protokoll-ID wird, wenn nicht uebergeben, aus dem Datei-Stem abgeleitet.
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import sys
import tempfile
import urllib.request
from pathlib import Path
from typing import Optional
from .protokoll_parser_nrw import parse_protocol
from .database import upsert_plenum_vote
logger = logging.getLogger(__name__)
def _derive_protokoll_id(pdf_path: Path) -> str:
"""Ermittle Protokoll-ID aus dem Datei-Stem (z.B. 'MMP18-119.pdf''MMP18-119')."""
return pdf_path.stem
def _download_pdf(url: str, dest: Path) -> Path:
"""Lade ein PDF von einer URL in einen Pfad. Wirft bei HTTP-Fehlern."""
req = urllib.request.Request(
url,
headers={"User-Agent": "GWOeAntragspruefer/1.0 (+https://gwoe.toppyr.de)"},
)
with urllib.request.urlopen(req, timeout=60) as resp:
dest.write_bytes(resp.read())
return dest
async def ingest_pdf(
pdf_path: Path,
*,
bundesland: str = "NRW",
protokoll_id: Optional[str] = None,
quelle_url: Optional[str] = None,
) -> dict:
"""Parse das PDF und schreibe alle gefundenen Abstimmungen in die DB.
Returns:
Statistik-Dict ``{parsed, written, skipped_no_drucksache, errors}``.
"""
pid = protokoll_id or _derive_protokoll_id(pdf_path)
parsed = parse_protocol(str(pdf_path))
written = 0
skipped_no_ds = 0
errors: list[str] = []
for entry in parsed:
ds = entry.get("drucksache")
if not ds:
skipped_no_ds += 1
continue
try:
await upsert_plenum_vote(
bundesland=bundesland,
drucksache=ds,
ergebnis=entry["ergebnis"],
einstimmig=bool(entry.get("einstimmig", False)),
fraktionen_ja=entry.get("votes", {}).get("ja", []),
fraktionen_nein=entry.get("votes", {}).get("nein", []),
fraktionen_enthaltung=entry.get("votes", {}).get("enthaltung", []),
quelle_protokoll=pid,
quelle_url=quelle_url,
)
written += 1
except Exception as exc:
logger.exception("Upsert fehlgeschlagen fuer %s", ds)
errors.append(f"{ds}: {exc}")
return {
"parsed": len(parsed),
"written": written,
"skipped_no_drucksache": skipped_no_ds,
"errors": errors,
"protokoll_id": pid,
"bundesland": bundesland,
}
def _cli() -> None:
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
parser = argparse.ArgumentParser(
description="Plenarprotokoll → plenum_vote_results-Tabelle (#106)",
)
src = parser.add_mutually_exclusive_group(required=True)
src.add_argument("--pdf", help="Pfad zu lokalem PDF")
src.add_argument("--url", help="HTTP(S)-URL zum PDF")
parser.add_argument("--bundesland", default="NRW",
help="Bundesland-Code (default: NRW)")
parser.add_argument("--protokoll-id",
help="Protokoll-ID (default: aus Datei-Stem)")
args = parser.parse_args()
if args.url:
# Download in tmp und nach dem Run wieder loeschen
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
tmp_path = Path(tmp.name)
try:
print(f"Lade {args.url}{tmp_path}")
_download_pdf(args.url, tmp_path)
pid = args.protokoll_id or args.url.rsplit("/", 1)[-1].rsplit(".", 1)[0]
stats = asyncio.run(ingest_pdf(
tmp_path, bundesland=args.bundesland,
protokoll_id=pid, quelle_url=args.url,
))
finally:
tmp_path.unlink(missing_ok=True)
else:
pdf_path = Path(args.pdf)
if not pdf_path.exists():
print(f"FEHLER: PDF nicht gefunden: {pdf_path}", file=sys.stderr)
sys.exit(1)
stats = asyncio.run(ingest_pdf(
pdf_path, bundesland=args.bundesland,
protokoll_id=args.protokoll_id,
))
print()
print(f"Protokoll {stats['protokoll_id']} ({stats['bundesland']})")
print(f" parsed: {stats['parsed']}")
print(f" written: {stats['written']}")
if stats["skipped_no_drucksache"]:
print(f" ohne DS: {stats['skipped_no_drucksache']}")
if stats["errors"]:
print(f" errors: {len(stats['errors'])}")
for e in stats["errors"][:5]:
print(f" {e}")
if stats["written"] == 0 and not stats["errors"]:
sys.exit(2)
if __name__ == "__main__":
_cli()

View File

@ -0,0 +1,160 @@
"""Tests fuer app/ingest_votes_nrw.py — PDF → plenum_vote_results Pipeline (#106)."""
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
from unittest.mock import patch
import pytest
# Gleiches aiosqlite-Setup-Problem wie in test_database.py — dort fix
# importieren, damit hier nichts gestubbed ist.
_aio = sys.modules.get("aiosqlite")
if _aio is not None and not hasattr(_aio, "connect"):
del sys.modules["aiosqlite"]
import aiosqlite # noqa: E402
import importlib # noqa: E402
if "app.database" in sys.modules:
if not hasattr(getattr(sys.modules["app.database"], "aiosqlite", None), "connect"):
del sys.modules["app.database"]
importlib.import_module("app.database")
else:
importlib.import_module("app.database")
def run(coro):
return asyncio.get_event_loop().run_until_complete(coro)
@pytest.fixture()
def db_path(tmp_path, monkeypatch):
path = tmp_path / "test.db"
from app.config import settings
monkeypatch.setattr(settings, "db_path", str(path))
return str(path)
@pytest.fixture()
def initialized_db(db_path):
from app import database
run(database.init_db())
return db_path
def _fake_parse_result(drucksache: str, ergebnis: str = "angenommen",
einstimmig: bool = False,
ja: list[str] = None, nein: list[str] = None,
enth: list[str] = None) -> dict:
return {
"drucksache": drucksache,
"ergebnis": ergebnis,
"einstimmig": einstimmig,
"votes": {
"ja": ja or [],
"nein": nein or [],
"enthaltung": enth or [],
},
"kind": "direct",
}
class TestIngestPdf:
def test_writes_each_parsed_vote(self, initialized_db, tmp_path):
from app import ingest_votes_nrw, database
fake_pdf = tmp_path / "MMP18-119.pdf"
fake_pdf.write_bytes(b"%PDF-1.4 fake")
parser_results = [
_fake_parse_result("18/100", "angenommen", ja=["CDU", "SPD"], nein=["AfD"]),
_fake_parse_result("18/200", "abgelehnt", ja=["AfD"], nein=["CDU", "SPD"]),
]
with patch("app.ingest_votes_nrw.parse_protocol", return_value=parser_results):
stats = run(ingest_votes_nrw.ingest_pdf(fake_pdf))
assert stats["parsed"] == 2
assert stats["written"] == 2
votes_100 = run(database.get_plenum_votes("NRW", "18/100"))
assert len(votes_100) == 1
assert votes_100[0]["fraktionen_ja"] == ["CDU", "SPD"]
assert votes_100[0]["quelle_protokoll"] == "MMP18-119"
def test_skips_entries_without_drucksache(self, initialized_db, tmp_path):
"""Anchors ohne aufloesbare Drucksache werden gezaehlt aber nicht
geschrieben (sonst muellt der Import die DB voll)."""
from app import ingest_votes_nrw
fake_pdf = tmp_path / "MMP18-50.pdf"
fake_pdf.write_bytes(b"%PDF")
parser_results = [
_fake_parse_result("18/300", "angenommen"),
{"drucksache": None, "ergebnis": "angenommen", "votes": {"ja": [], "nein": [], "enthaltung": []}},
]
with patch("app.ingest_votes_nrw.parse_protocol", return_value=parser_results):
stats = run(ingest_votes_nrw.ingest_pdf(fake_pdf))
assert stats["parsed"] == 2
assert stats["written"] == 1
assert stats["skipped_no_drucksache"] == 1
def test_protokoll_id_default_from_stem(self, initialized_db, tmp_path):
from app import ingest_votes_nrw, database
fake_pdf = tmp_path / "MMP18-77.pdf"
fake_pdf.write_bytes(b"%PDF")
with patch("app.ingest_votes_nrw.parse_protocol",
return_value=[_fake_parse_result("18/500")]):
stats = run(ingest_votes_nrw.ingest_pdf(fake_pdf))
assert stats["protokoll_id"] == "MMP18-77"
votes = run(database.get_plenum_votes("NRW", "18/500"))
assert votes[0]["quelle_protokoll"] == "MMP18-77"
def test_protokoll_id_override(self, initialized_db, tmp_path):
from app import ingest_votes_nrw, database
fake_pdf = tmp_path / "scan.pdf"
fake_pdf.write_bytes(b"%PDF")
with patch("app.ingest_votes_nrw.parse_protocol",
return_value=[_fake_parse_result("18/600")]):
run(ingest_votes_nrw.ingest_pdf(
fake_pdf, protokoll_id="MMP18-99", quelle_url="https://example.com/x.pdf",
))
votes = run(database.get_plenum_votes("NRW", "18/600"))
assert votes[0]["quelle_protokoll"] == "MMP18-99"
assert votes[0]["quelle_url"] == "https://example.com/x.pdf"
def test_bundesland_override(self, initialized_db, tmp_path):
"""Adapter fuer andere BL koennten denselben Ingest-Helper nutzen."""
from app import ingest_votes_nrw, database
fake_pdf = tmp_path / "MV-MP1.pdf"
fake_pdf.write_bytes(b"%PDF")
with patch("app.ingest_votes_nrw.parse_protocol",
return_value=[_fake_parse_result("8/100")]):
run(ingest_votes_nrw.ingest_pdf(fake_pdf, bundesland="MV"))
# Lookup unter dem richtigen BL
votes_mv = run(database.get_plenum_votes("MV", "8/100"))
assert len(votes_mv) == 1
votes_nrw = run(database.get_plenum_votes("NRW", "8/100"))
assert votes_nrw == []
def test_re_ingest_overwrites_same_protokoll(self, initialized_db, tmp_path):
"""Erneuter Ingest desselben Protokolls aktualisiert die Eintraege
(idempotent), kein Duplikat."""
from app import ingest_votes_nrw, database
fake_pdf = tmp_path / "MMP18-1.pdf"
fake_pdf.write_bytes(b"%PDF")
with patch("app.ingest_votes_nrw.parse_protocol",
return_value=[_fake_parse_result("18/700", "angenommen", ja=["CDU"])]):
run(ingest_votes_nrw.ingest_pdf(fake_pdf))
# Re-Ingest mit korrigiertem Ergebnis (z.B. Parser-Fix)
with patch("app.ingest_votes_nrw.parse_protocol",
return_value=[_fake_parse_result("18/700", "abgelehnt", ja=[], nein=["CDU"])]):
run(ingest_votes_nrw.ingest_pdf(fake_pdf))
votes = run(database.get_plenum_votes("NRW", "18/700"))
assert len(votes) == 1
assert votes[0]["ergebnis"] == "abgelehnt"
assert votes[0]["fraktionen_nein"] == ["CDU"]