"""Berlin (BE) — Plenarprotokoll-Parser (#106 / #150, ADR 0009). PDF-basierter Parser fuer Berliner Abgeordnetenhaus-Plenarprotokolle. Quelle: ``https://www.parlament-berlin.de/ados/{wp}/IIIPlen/protokoll/plen{wp}-{n:03}-pp.pdf`` ## Anchor-Sprache (verifiziert WP19 Sitzung 50) Berliner Abstimmungs-Sprache ist NRW-aehnlich, mit eigenem Sprach-Stil: ``` Wer den Antrag auf Drucksache 19/1589 annehmen moechte, den bitte ich jetzt um das Handzeichen. – Das sind die Fraktionen Bündnis 90/ Die Gruenen und Die Linke. Wer stimmt dagegen? – Das sind die Fraktionen der CDU, SPD und AfD. Wer enthaelt sich, pro forma? – Das ist niemand. Damit ist der Antrag abgelehnt. ``` ## Pattern-Erkennung - **Result-Anchor:** ``Damit ist [Antrag/Aenderungsantrag/...] (angenommen|abgelehnt)`` - **Vote-Block:** drei Q+A-Paare im Reden-Stil - JA: ``annehmen moechte ... – Das sind [PHRASE]`` - NEIN: ``Wer stimmt dagegen? – Das sind [PHRASE]`` - ENTH: ``Wer enthaelt sich(, pro forma)? – [PHRASE]`` - **Drucksachen-Lookup:** ``auf Drucksache 19/N`` rueckwaerts vom Anchor ## Fraktions-Mapping WP19 - ``Buendnis 90/Die Gruenen`` → GRÜNE - ``Die Linke`` → LINKE - ``CDU``, ``SPD``, ``AfD``, ``FDP`` - ``fraktionsloser Abgeordneter`` → ignoriert (Einzelpersonen) """ from __future__ import annotations import re from typing import Optional try: import fitz # PyMuPDF except ImportError: fitz = None ALLE_FRAKTIONEN_BE = ["CDU", "SPD", "GRÜNE", "LINKE", "AfD", "FDP"] # Reihenfolge: längere Aliasse zuerst. FRAKTIONEN_MAP_BE = [ ("Bündnis 90/Die Grünen", ["GRÜNE"]), ("Bündnisses 90/Die Grünen", ["GRÜNE"]), ("Bündnis 90", ["GRÜNE"]), ("Die Linke", ["LINKE"]), ("der Linken", ["LINKE"]), ("Linke", ["LINKE"]), ("CDU", ["CDU"]), ("SPD", ["SPD"]), ("AfD", ["AfD"]), ("FDP", ["FDP"]), ] def _normalize_fraktionen_be(text: str) -> list[str]: """Extrahiere BE-Fraktions-Codes aus einer Phrase.""" found = set() remaining = text for phrase, codes in FRAKTIONEN_MAP_BE: if phrase in remaining: for c in codes: found.add(c) remaining = remaining.replace(phrase, " ") return sorted(found) # Result-Anchor: "Damit ist [Subjekt] (angenommen|abgelehnt)" RESULT_ANCHOR_RE = re.compile( r"Damit ist\s+(?:auch\s+)?(?:dieser?|die|das|der)\s+" r"(?PAntrag|Änderungsantrag|Gesetzesvorlage|Gesetzentwurf|" r"Entschließungsantrag|Beschlussempfehlung)" r"[^.]{0,200}?(?Pangenommen|abgelehnt)\.", re.DOTALL, ) # Vote-Sub-Patterns: 3 Reden-Q+A-Paare. Wir akzeptieren Punkte zwischen # der Frage und dem Vote-Block ("möchte. – Das sind ..."), und stoppen am # nächsten Q-Marker oder Damit-Anchor. JA_RE = re.compile( r"annehmen m(?:ö|oe)chte[^?]{0,200}?[–-]\s+(?:Das sind\s+|Das ist\s+)?" r"(?P[^?]+?)(?=Wer stimmt dagegen|Wer enthält sich|Wer enthaelt sich|Damit ist)", re.DOTALL, ) NEIN_RE = re.compile( r"Wer stimmt dagegen\?[^?]{0,40}?[–-]\s+(?:Das sind\s+|Das ist\s+)?" r"(?P[^?]+?)(?=Wer enthält sich|Wer enthaelt sich|Damit ist)", re.DOTALL, ) ENTH_RE = re.compile( r"Wer enth(?:ä|ae)lt sich(?:,\s+pro forma)?\?[^?]{0,40}?[–-]\s+" r"(?P[^?.]+?)(?=Damit ist|Wer|\.)", re.DOTALL, ) DS_RE_BE = re.compile(r"Drucksache\s+(\d{1,2}/\d{2,5}(?:-\d+)?)") def _parse_vote_block_be(block: str) -> dict: """Parst BE-Vote-Block aus dem Text vor einem Damit-Anchor.""" votes = {"ja": [], "nein": [], "enthaltung": []} ja_m = JA_RE.search(block) if ja_m: votes["ja"] = _normalize_fraktionen_be(ja_m.group("ja")) nein_m = NEIN_RE.search(block) if nein_m: votes["nein"] = _normalize_fraktionen_be(nein_m.group("nein")) enth_m = ENTH_RE.search(block) if enth_m: enth_text = enth_m.group("enth") # 'niemand' → leere Liste (Berliner Idiom) if "niemand" in enth_text.lower() or "ist nicht der Fall" in enth_text: votes["enthaltung"] = [] else: votes["enthaltung"] = _normalize_fraktionen_be(enth_text) return votes def _resolve_drucksache_be(text: str, anchor_start: int) -> Optional[str]: """Rueckwaerts vom Anchor die Drucksache finden (1500-Zeichen Window).""" window_start = max(0, anchor_start - 1500) window = text[window_start:anchor_start] matches = list(DS_RE_BE.finditer(window)) if matches: return matches[-1].group(1) return None def _normalize_text(text: str) -> str: """Whitespace-Normalisierung wie NRW-Parser.""" text = re.sub(r"\s+", " ", text) return text def parse_protocol(pdf_path: str) -> list[dict]: """Parst ein Berliner Plenarprotokoll-PDF und liefert Vote-Records.""" if fitz is None: raise ImportError("PyMuPDF (fitz) ist erforderlich fuer den BE-Parser") doc = fitz.open(pdf_path) full = "".join(p.get_text() for p in doc) doc.close() full = _normalize_text(full) results = [] for m in RESULT_ANCHOR_RE.finditer(full): ergebnis = m.group("ergebnis") # Vote-Block: 1500 Zeichen vor dem Anchor block_start = max(0, m.start() - 1500) block = full[block_start:m.end()] ds = _resolve_drucksache_be(full, m.start()) if not ds: continue votes = _parse_vote_block_be(block) einstimmig = ( len(votes["ja"]) >= 5 and not votes["nein"] and not votes["enthaltung"] ) results.append({ "drucksache": ds, "ergebnis": ergebnis, "einstimmig": einstimmig, "kind": "direct", "votes": votes, "anchor_pos": m.start(), }) # Dedup ueber anchor_pos seen = set() deduped = [] for r in results: if r["anchor_pos"] in seen: continue seen.add(r["anchor_pos"]) deduped.append(r) return deduped