gwoe-antragspruefer/app/protokoll_parser_nrw.py
Dotty Dotter d640734641 feat(#106,#134): NRW-Protokoll-Parser v5 ins Repo migriert
Vorher als parser_v5_iteration15.py nur auf Prod-Server, nicht
versionskontrolliert. Jetzt unter app/protokoll_parser_nrw.py
mit klarem Naming-Schema (BL-Suffix, damit Folge-Adapter analog
heissen koennen, vgl. ADR 0002).

Aenderungen am Code:
- from __future__ import annotations (Py3.9-kompatibel fuer 'str | None')
- fitz-Import optional (try/except), damit pure-string-Funktionen
  auch im Stub-conftest funktionieren

30 Tests in test_protokoll_parser_nrw.py (#134 Phase 2):
- normalize_fraktionen: F.D.P., GRÜNE-Aliase, Landesregierung
- _is_empty_phrase: Niemand/Keine/nicht-Mustern
- _parse_vote_block: ja/nein-Extraktion plus Negationen
- find_results: angenommen/abgelehnt, einstimmig (nur ueber-Kind!),
  (neu)-Suffix in Drucksachen-Nrn, Sortierung, Dedup
- resolve_drucksache_for_ueber: Backward-Search mit closest-match

Refs #106 (Abstimmungsverhalten verknuepfen — Vorbereitung fuer DB-Schema)
Refs #126 (BL-uebergreifender Parser — NRW als Referenz-Implementierung)
Refs #134 (Test-Suite Audit — Phase 2)
2026-04-28 02:08:03 +02:00

349 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""NRW-Plenarprotokoll Abstimmungs-Parser v5 (deterministisch, anchor-basiert).
Neue Architektur: Statt pro Drucksache zu suchen, findet der Parser zuerst
alle **Result-Anchors** im Volltext ("Damit ist ... angenommen/abgelehnt/...")
und extrahiert pro Anchor rückwärts:
1. die zugehörige Drucksache (nächste 18/XXXXX davor, innerhalb ~500 chars)
2. den Vote-Block (letztes "Wer stimmt ... zu?" vor dem Anchor)
Fixture-basierte Tests. Ziel: 18/19 (17824 ist bewusst nicht_gesondert).
Migriert nach app/ aus dem POC-Skript parser_v5_iteration15.py
(2026-04-28, #134/#106). Fitz-Import ist optional — pure-string-Funktionen
laufen ohne, parse_protocol() braucht das echte fitz.
"""
from __future__ import annotations
import re
import json
import sys
try: # fitz ist optional — pure-string-Funktionen laufen ohne
import fitz
except ImportError:
fitz = None
FRAKTIONEN_MAP = [
("Bündnis 90/Die Grünen", "GRÜNE"),
("Bündnis 90", "GRÜNE"),
("Grünen", "GRÜNE"),
("GRÜNE", "GRÜNE"),
("F.D.P.", "FDP"),
("FDP", "FDP"),
("CDU", "CDU"),
("SPD", "SPD"),
("AfD", "AfD"),
("LINKE", "LINKE"),
("BSW", "BSW"),
("Landesregierung", "Landesregierung"),
]
ALLE_FRAKTIONEN_NRW = ["CDU", "SPD", "GRÜNE", "FDP", "AfD"]
def normalize_fraktionen(txt):
"""Extrahiere Fraktions-Tokens aus einem Text-Abschnitt."""
found = set()
# Reihenfolge: längere zuerst (damit "Bündnis 90/Die Grünen" vor "Grünen" matcht)
remaining = txt
for key, val in FRAKTIONEN_MAP:
if key in remaining:
found.add(val)
remaining = remaining.replace(key, "") # Doppel-Match vermeiden
return sorted(found)
def _is_empty_phrase(txt):
"""Prüft ob der Text eine Negation ausdrückt (niemand, nicht, keine)."""
neg = ["niemand", "Niemand", "Keine", "keine", "nicht der Fall",
"Auch nicht", "ist nicht", "ist auch nicht", "nicht vor"]
return any(n in txt for n in neg)
def _parse_vote_block(block: str) -> dict:
"""Extrahiere ja/nein/enthaltung aus dem Text-Block vor einem Result-Anchor.
Vereinfachter Ansatz: matche bis zum nächsten '?' oder 200 chars.
"""
votes = {"ja": [], "nein": [], "enthaltung": []}
# JA — letztes Match gewinnt (bei Re-Votes)
ja_matches = list(re.finditer(
r"Wer stimmt(?! dagegen)[^?]{0,80}zu\?\s*[-]?\s*([^?]{1,250})",
block
))
if ja_matches:
g = ja_matches[-1].group(1)
if not _is_empty_phrase(g):
votes["ja"] = normalize_fraktionen(g)
# NEIN
nein_patterns = [
r"Wer stimmt dagegen\?\s*[-]?\s*([^?]{1,200})",
r"Wer lehnt[^?]{0,30}ab\?\s*[-]?\s*([^?]{1,200})",
r"Stimmt jemand dagegen\?\s*[-]?\s*([^?]{1,120})",
r"Ist jemand dagegen\?\s*[-]?\s*([^?]{1,120})",
]
for pat in nein_patterns:
matches = list(re.finditer(pat, block))
if matches:
g = matches[-1].group(1)
votes["nein"] = [] if _is_empty_phrase(g) else normalize_fraktionen(g)
break
# ENTHALTUNG
enth_patterns = [
r"Wer enthält sich\?\s*[-]?\s*([^?]{1,200})",
r"Gibt es Enthaltungen\?\s*[-]?\s*([^?]{1,200})",
r"Enthält sich jemand\?\s*[-]?\s*([^?]{1,120})",
r"Möchte sich jemand enthalten\?\s*[-]?\s*([^?]{1,120})",
]
for pat in enth_patterns:
matches = list(re.finditer(pat, block))
if matches:
g = matches[-1].group(1)
votes["enthaltung"] = [] if _is_empty_phrase(g) else normalize_fraktionen(g)
break
# Implizite leere Enthaltungen: "Enthaltungen gibt es damit nicht"
if not votes["enthaltung"] and re.search(r"Enthaltungen\s+gibt\s+es\s+damit\s+nicht", block):
votes["enthaltung"] = []
return votes
# Result-Anchors: Pattern → (ergebnis, is_ueberweisung)
# v6: Broad-Anchor-Matches für alle direkten Varianten.
# Type 'direct_broad': matcht "Damit/Somit ist der/dieser/die Antrag/Gesetzentwurf/...
# ... angenommen/abgelehnt/überwiesen/verabschiedet" — Drucksache wird
# separat aus dem Match-Span extrahiert (oder aus dem vorangehenden Segment).
RESULT_ANCHORS = [
# Broad direct-result pattern (deckt fast alle Varianten ab).
# "beschlossen" = bei direkter Abstimmung eines Antrags = angenommen
(r"(?:Damit|Somit) ist (?:der|dieser|die|diese) (?:Antrag|Gesetzentwurf|Änderungsantrag|Wahlvorschlag|Entschließungsantrag|Beschlussempfehlung)[^.]{0,200}?(angenommen|abgelehnt|überwiesen|zurückgezogen|verabschiedet|beschlossen)", "direct_broad"),
# Variante ohne führendes "Damit/Somit ist": "Dieser Antrag Drucksache X ist somit ... abgelehnt"
(r"Dieser (?:Antrag|Gesetzentwurf|Änderungsantrag|Wahlvorschlag)[^.]{0,200}?(angenommen|abgelehnt|überwiesen|zurückgezogen|verabschiedet|beschlossen)", "direct_broad"),
# Überweisungs-Anchor (Drucksache muss rückwärts gesucht werden)
(r"(?:Damit|Somit) ist (?:diese|die)\s+Überweisungsempfehlung\s+(einstimmig\s+|ebenso\s+)?(angenommen)", "ueber"),
(r"Somit ist das so beschlossen()()", "ueber"),
(r"Damit ist das so beschlossen()()", "ueber"),
# "Damit schließt sich der Landtag der Empfehlung des Rechtsausschusses an" — Empfehlung-Beitritt
(r"Damit schließt sich der Landtag der Empfehlung[^.]{0,100}?an()()", "ueber"),
# Petitionsausschuss-Sammel-Abstimmung
(r"Damit sind die Beschlüsse des Petitionsausschusses[^.]{0,100}?bestätigt()()", "petition"),
# Übersicht-Bestätigung (§ 82 Abs. 2 GO)
(r"Damit sind die in Drucksache (\d+/\d+(?:\(neu\))?) enthaltenen[^.]{0,150}?bestätigt()", "uebersicht"),
]
def find_results(text: str) -> list[dict]:
"""Finde alle Result-Anchors im Text.
Returns: Liste von {drucksache, ergebnis, anchor_start, anchor_end, kind, einstimmig}.
"""
results = []
for pat, kind in RESULT_ANCHORS:
for m in re.finditer(pat, text):
groups = m.groups()
ds = None
einstimmig = False
span_text = text[m.start():m.end()]
# Für "direct" kind: erste DS-artige Group ist die Drucksache
if kind == "direct":
for g in groups:
if g and re.match(r"^\d+/\d+(?:\(neu\))?$", g):
ds = g
break
# Für "direct_broad": Drucksache innerhalb des Match-Spans suchen
elif kind == "direct_broad":
ds_match = re.search(r"Drucksache\s+(\d+/\d+(?:\(neu\))?)", span_text)
if ds_match:
ds = ds_match.group(1)
# Ergebnis: suche bekanntes Wort in allen Groups
ergebnis = None
for g in groups:
if g and g.strip() == "einstimmig":
einstimmig = True
if g and g.strip() in ("angenommen", "abgelehnt", "überwiesen", "zurückgezogen", "verabschiedet", "beschlossen"):
ergebnis = g.strip()
# "verabschiedet" = angenommen und verabschiedet (Gesetzentwurf)
# "beschlossen" (bei direkter Abstimmung) = angenommen
if ergebnis in ("verabschiedet", "beschlossen"):
ergebnis = "angenommen"
if kind == "ueber":
ergebnis = "überwiesen"
if "einstimmig" in text[m.start():m.end() + 5]:
einstimmig = True
# "Damit ist das so beschlossen" / "Somit ist das so beschlossen" = implizit einstimmig
if "so beschlossen" in text[m.start():m.end() + 5]:
einstimmig = True
if kind == "petition":
ergebnis = "sammel"
einstimmig = True
if kind == "uebersicht":
ergebnis = "bestätigt"
einstimmig = True
# Drucksache ist in Group[0] des Patterns
for g in groups:
if g and re.match(r"^\d+/\d+(?:\(neu\))?$", g):
ds = g
break
if not ergebnis:
continue
results.append({
"drucksache": ds,
"ergebnis": ergebnis,
"kind": kind,
"einstimmig": einstimmig,
"anchor_start": m.start(),
"anchor_end": m.end(),
})
results.sort(key=lambda r: r["anchor_start"])
dedup = []
seen_positions = set()
for r in results:
if r["anchor_start"] in seen_positions:
continue
seen_positions.add(r["anchor_start"])
dedup.append(r)
return dedup
def resolve_drucksache_for_ueber(text: str, anchor_start: int) -> str | None:
"""Für Überweisungs-Anchors: rückwärts die nächste Drucksache-Nr suchen."""
# Schaue bis 2000 chars zurück
window_start = max(0, anchor_start - 2000)
window = text[window_start:anchor_start]
# Letzte Drucksache vor dem Anchor
matches = list(re.finditer(r"Drucksache\s+(\d+/\d+(?:\(neu\))?)", window))
if not matches:
return None
return matches[-1].group(1)
def normalize_text(text: str) -> str:
"""Normalisiere PDF-Text: Worttrennungen (-\n) auflösen, Zeilenumbrüche zu Spaces."""
# Worttrennung am Zeilenende: "Überweisungs-\nempfehlung" → "Überweisungsempfehlung"
text = re.sub(r"-\s*\n\s*", "", text)
# Alle restlichen Zeilenumbrüche zu Spaces
text = re.sub(r"\s+", " ", text)
return text
def parse_protocol(pdf_path: str) -> list[dict]:
doc = fitz.open(pdf_path)
full = "".join(page.get_text() for page in doc)
doc.close()
full = normalize_text(full)
anchors = find_results(full)
parsed = []
# Segment-Boundaries: jede Abstimmung beginnt mit einer dieser Phrasen
segment_starts = [m.start() for m in re.finditer(
r"(?:(?:Damit|Somit) kommen wir (?:zur|somit zur) Abstimmung|Wir kommen (?:somit )?zur Abstimmung|Wir stimmen(?!\s+zu\?)|(?:Somit|Damit) kommen wir (?:direkt )?zu den Abstimmungen|Wir stimmen zweitens|gehen (?:wir )?zur Abstimmung über|Somit kommen wir sofort zur Abstimmung)",
full
)]
def segment_start_for(anchor_pos: int) -> int:
"""Letzte Segment-Grenze vor dem Anchor."""
candidates = [s for s in segment_starts if s < anchor_pos]
return candidates[-1] if candidates else max(0, anchor_pos - 1500)
for a in anchors:
ds = a["drucksache"]
if not ds:
ds = resolve_drucksache_for_ueber(full, a["anchor_start"])
if not ds:
continue
# Vote-Block: vom letzten Segment-Start bis zum Anchor
block_start = segment_start_for(a["anchor_start"])
block = full[block_start:a["anchor_end"]]
# Einstimmig: immer alle ja, unabhängig davon was das Fenster sagt
if a["einstimmig"]:
votes = {"ja": list(ALLE_FRAKTIONEN_NRW), "nein": [], "enthaltung": []}
else:
votes = _parse_vote_block(block)
# Fallback-Einstimmig: wenn ein Überweisungs-Anchor keinen eigenen
# "Wer stimmt ... zu?"-Block hat (stattdessen nur inverse Form
# "Wer stimmt gegen ...?"), ist das in der Praxis einstimmig.
if a["kind"] == "ueber" and not votes["ja"] and not votes["nein"] and not votes["enthaltung"]:
votes = {"ja": list(ALLE_FRAKTIONEN_NRW), "nein": [], "enthaltung": []}
parsed.append({
"drucksache": ds,
"ergebnis": a["ergebnis"],
"votes": votes,
"anchor_pos": a["anchor_start"],
})
return parsed
def compare_to_fixture(parsed: list[dict], fixture: dict) -> tuple[int, list]:
"""Vergleiche Parser-Output gegen Ground-Truth-Fixture."""
parsed_map = {}
for p in parsed:
parsed_map.setdefault(p["drucksache"], []).append(p)
errors = []
matches = 0
for gt in fixture["drucksachen"]:
ds = gt["drucksache"]
gt_erg = gt["ergebnis"]
if ds not in parsed_map:
if gt_erg == "nicht_gesondert_abgestimmt":
# Korrekt NICHT gefunden
matches += 1
continue
errors.append(f"{ds}: NOT FOUND")
continue
if gt_erg == "nicht_gesondert_abgestimmt":
errors.append(f"{ds}: expected nicht_gesondert, but parser found it")
continue
# Pick the one closest to expected — if multiple, take the first
candidates = parsed_map[ds]
p = candidates[0]
gt_erg = gt["ergebnis"]
if gt_erg == "nicht_gesondert_abgestimmt":
# Erwartetes Verhalten: Parser sollte es NICHT finden
continue
ok = True
if p["ergebnis"] != gt_erg:
errors.append(f"{ds}: ergebnis {p['ergebnis']} != {gt_erg}")
ok = False
if sorted(p["votes"]["ja"]) != sorted(gt["ja"]):
errors.append(f"{ds}: ja {p['votes']['ja']} != {gt['ja']}")
ok = False
if sorted(p["votes"]["nein"]) != sorted(gt["nein"]):
errors.append(f"{ds}: nein {p['votes']['nein']} != {gt['nein']}")
ok = False
if sorted(p["votes"]["enthaltung"]) != sorted(gt["enthaltung"]):
errors.append(f"{ds}: enth {p['votes']['enthaltung']} != {gt['enthaltung']}")
ok = False
if ok:
matches += 1
return matches, errors
if __name__ == "__main__":
pdf = "/tmp/mmp18-119.pdf"
fixture_path = "/tmp/nrw_fixture.json"
fixture = json.load(open(fixture_path))
parsed = parse_protocol(pdf)
print(f"Parsed {len(parsed)} Abstimmungen gesamt")
matches, errors = compare_to_fixture(parsed, fixture)
print(f"Match gegen Fixture: {matches}/{len(fixture['drucksachen']) - 1} (ohne nicht_gesondert)")
print()
if errors:
print("Fehler:")
for e in errors:
print(f" {e}")