gwoe-antragspruefer/app/protokoll_parsers/hh.py

160 lines
4.6 KiB
Python
Raw Normal View History

"""Hamburg (HH) — Plenarprotokoll-Parser (#106 / #155, ADR 0009).
Hamburg publiziert kompakte **Beschlussprotokolle** (Tabellen-Form mit
Vote-Block pro Beschluss). PDF-URL-Discovery laeuft ueber die Index-Seite
``hamburgische-buergerschaft.de/recherche-info/protokolle`` (Blob-IDs
nicht direkt vorhersagbar).
## Anchor-Sprache (verifiziert WP23 Sitzung 22)
```
... Antrag der GRUENEN und SPD-Fraktion mehrheitlich mit den Stimmen
der SPD und GRUENEN gegen die Stimmen der CDU und AfD bei Enthaltung
der Linken angenommen
```
Pattern:
- ``einstimmig (angenommen|abgelehnt)`` alle Fraktionen ja/nein
- ``mehrheitlich mit den Stimmen X gegen die Stimmen Y (bei Enthaltung Z)? (angenommen|abgelehnt)``
## Fraktions-Mapping WP23
- ``GRUENE``, ``SPD``, ``CDU``, ``AfD`` (rot-gruener Senat)
- ``Linke`` / ``Linken`` LINKE
- ``Abg. {Name}`` einzelne Abgeordnete (ignorieren)
## Drucksachen-Format
``Drucksache 23/N`` oder bare ``23/N``. Drucksachen aus laufender WP
und Vor-WP gemischt im Text. Lookup nimmt die naechste DS rueckwaerts
vom Anchor.
"""
from __future__ import annotations
import re
from typing import Optional
try:
import fitz
except ImportError:
fitz = None
ALLE_FRAKTIONEN_HH = ["SPD", "GRÜNE", "CDU", "AfD", "LINKE"]
FRAKTIONEN_MAP_HH = [
("GRÜNEN", ["GRÜNE"]),
("GRÜNE", ["GRÜNE"]),
("SPD", ["SPD"]),
("CDU", ["CDU"]),
("AfD", ["AfD"]),
("Linken", ["LINKE"]),
("Linke", ["LINKE"]),
]
def _normalize_fraktionen_hh(text: str) -> list[str]:
found = set()
remaining = text
for phrase, codes in FRAKTIONEN_MAP_HH:
if phrase in remaining:
for c in codes:
found.add(c)
remaining = remaining.replace(phrase, " ")
return sorted(found)
# Result-Anchor: einstimmig oder mehrheitlich + (Vote-Block) + (angenommen|abgelehnt)
RESULT_ANCHOR_RE = re.compile(
r"(?P<modus>einstimmig|mehrheitlich)"
r"(?P<vote_block>(?:\s+mit den Stimmen[^.]{0,400})?)"
r"\s+(?P<ergebnis>angenommen|abgelehnt)",
re.DOTALL,
)
DS_RE_HH = re.compile(r"(?:Drucksache\s+)?(\d{2}/\d{3,5})")
def _parse_vote_block_hh(vote_block: str) -> dict:
"""Parst HH-Vote-Block 'mit den Stimmen X gegen die Stimmen Y bei Enthaltung Z'."""
votes = {"ja": [], "nein": [], "enthaltung": []}
if not vote_block.strip():
return votes
nein_idx = vote_block.find("gegen die Stimmen")
enth_idx = vote_block.find("bei Enthaltung")
end_ja = min(idx for idx in (nein_idx, enth_idx, len(vote_block)) if idx >= 0)
ja_text = vote_block[:end_ja]
votes["ja"] = _normalize_fraktionen_hh(ja_text)
if nein_idx >= 0:
end_nein = enth_idx if enth_idx > nein_idx else len(vote_block)
nein_text = vote_block[nein_idx + len("gegen die Stimmen"):end_nein]
votes["nein"] = _normalize_fraktionen_hh(nein_text)
if enth_idx >= 0:
enth_text = vote_block[enth_idx + len("bei Enthaltung"):]
votes["enthaltung"] = _normalize_fraktionen_hh(enth_text)
return votes
def _resolve_drucksache_hh(text: str, anchor_start: int) -> Optional[str]:
"""Rueckwaerts vom Anchor naechste Drucksache."""
window_start = max(0, anchor_start - 800)
window = text[window_start:anchor_start]
matches = list(DS_RE_HH.finditer(window))
if matches:
return matches[-1].group(1)
return None
def _normalize_text(text: str) -> str:
return re.sub(r"\s+", " ", text)
def parse_protocol(pdf_path: str) -> list[dict]:
"""Parst ein Hamburger Beschlussprotokoll-PDF."""
if fitz is None:
raise ImportError("PyMuPDF (fitz) ist erforderlich fuer den HH-Parser")
doc = fitz.open(pdf_path)
full = "".join(p.get_text() for p in doc)
doc.close()
full = _normalize_text(full)
results = []
for m in RESULT_ANCHOR_RE.finditer(full):
modus = m.group("modus")
vote_block = m.group("vote_block") or ""
ergebnis = m.group("ergebnis")
ds = _resolve_drucksache_hh(full, m.start())
if not ds:
continue
votes = _parse_vote_block_hh(vote_block)
einstimmig = modus == "einstimmig"
if einstimmig and not votes["ja"]:
votes["ja"] = list(ALLE_FRAKTIONEN_HH)
results.append({
"drucksache": ds,
"ergebnis": ergebnis,
"einstimmig": einstimmig,
"kind": "direct",
"votes": votes,
"anchor_pos": m.start(),
})
seen = set()
deduped = []
for r in results:
if r["anchor_pos"] in seen:
continue
seen.add(r["anchor_pos"])
deduped.append(r)
return deduped