"""Bundestag (BUND) — Plenarprotokoll-Parser (#106 / #148, ADR 0009). XML-basierter Parser für Bundestags-Plenarprotokolle. Quelle: ``https://dserver.bundestag.de/btp/{wp}/{wp}{n:03}.xml`` (auch .pdf verfuegbar; XML ist strukturierter, daher bevorzugt). ## Anchor-Sprache (verifiziert WP20 Sitzungen 30, 100) Bundestag formuliert Beschluesse mit: ``` Die Beschlussempfehlung ist mit den Stimmen der Koalitionsfraktionen und der Fraktion Die Linke gegen die Stimmen der CDU/CSU-Fraktion bei Enthaltung der AfD-Fraktion angenommen. ``` Pattern: - Subjekt: "Die Beschlussempfehlung", "Der Überweisungsvorschlag", "Der Antrag", "Der Gesetzentwurf" - Vote-Block: "mit den Stimmen X gegen die Stimmen Y bei Enthaltung Z" - Anchor-Verb: "angenommen" oder "abgelehnt" ## Fraktions-Mapping Koalitions-/Oppositions-Bezeichnungen aendern sich pro Wahlperiode. Aktuell hardcoded fuer **WP20** (2021-2025, Ampel): - "Koalitionsfraktionen" → SPD + GRÜNE + FDP - "Oppositionsfraktionen" → CDU/CSU + AfD + LINKE WP21 (ab 2025) wuerde anderes Mapping brauchen. Folge-Issue notwendig. ## Drucksachen-Aufloesung Vor dem Anchor wird rueckwaerts nach "Drucksache 20/N" oder "auf Drucksache 20/N" gesucht. Der naechste Match in einem 1500-Zeichen- Fenster gewinnt. """ from __future__ import annotations import re import xml.etree.ElementTree as ET from typing import Optional # WP20 (2021-2025) Koalition: SPD + GRÜNE + FDP. Opposition: CDU/CSU + AfD + LINKE. # WP21 Implementierung erfordert separates Mapping pro WP — folgt sobald gebraucht. WP20_KOALITIONSFRAKTIONEN = ["SPD", "GRÜNE", "FDP"] WP20_OPPOSITIONSFRAKTIONEN = ["CDU/CSU", "AfD", "LINKE"] # Phrase → kanonische Fraktions-Codes. Reihenfolge: längere Aliasse zuerst. FRAKTIONEN_MAP_BT = [ ("Koalitionsfraktionen", WP20_KOALITIONSFRAKTIONEN), ("Koalitionsfraktion", WP20_KOALITIONSFRAKTIONEN), ("Oppositionsfraktionen", WP20_OPPOSITIONSFRAKTIONEN), ("Oppositionsfraktion", WP20_OPPOSITIONSFRAKTIONEN), ("Fraktion Bündnis 90/Die Grünen", ["GRÜNE"]), ("Bündnis 90/Die Grünen", ["GRÜNE"]), ("Fraktion Die Linke", ["LINKE"]), ("Die Linke", ["LINKE"]), ("CDU/CSU-Fraktion", ["CDU/CSU"]), ("Fraktion der CDU/CSU", ["CDU/CSU"]), ("CDU/CSU", ["CDU/CSU"]), ("SPD-Fraktion", ["SPD"]), ("Fraktion der SPD", ["SPD"]), ("SPD", ["SPD"]), ("FDP-Fraktion", ["FDP"]), ("Fraktion der FDP", ["FDP"]), ("FDP", ["FDP"]), ("AfD-Fraktion", ["AfD"]), ("Fraktion der AfD", ["AfD"]), ("AfD", ["AfD"]), ] ALL_BT_FRAKTIONEN = ["CDU/CSU", "SPD", "GRÜNE", "FDP", "AfD", "LINKE"] def _normalize_fraktionen_bt(text: str) -> list[str]: """Extrahiere BT-Fraktions-Codes aus einer Phrase.""" found = set() remaining = text for phrase, codes in FRAKTIONEN_MAP_BT: if phrase in remaining: for c in codes: found.add(c) remaining = remaining.replace(phrase, " ") return sorted(found) # Result-Anchor: Subjekt + "ist mit den Stimmen [...] (angenommen|abgelehnt)" # Großzügige 500-char-Begrenzung weil BT-Vote-Blocks lang werden koennen. RESULT_ANCHOR_RE = re.compile( r"(?PDie Beschlussempfehlung|Der Überweisungsvorschlag|Der Antrag" r"|Der Gesetzentwurf|Diese Beschlussempfehlung)" r"\s+ist\s+mit den Stimmen(?P[^.]{20,500}?)" r"\s+(?Pangenommen|abgelehnt)\s*\.", re.DOTALL, ) def _parse_vote_block_bt(votes_text: str) -> dict: """Parst BT-Vote-Phrase: 'X gegen die Stimmen Y bei Enthaltung Z'.""" result = {"ja": [], "nein": [], "enthaltung": []} # Aufsplit-Marker nein_idx = votes_text.find("gegen die Stimmen") enth_idx = votes_text.find("bei Enthaltung") # Boundaries end_ja = min(idx for idx in (nein_idx, enth_idx, len(votes_text)) if idx >= 0) ja_text = votes_text[:end_ja] result["ja"] = _normalize_fraktionen_bt(ja_text) if nein_idx >= 0: end_nein = enth_idx if enth_idx > nein_idx else len(votes_text) nein_text = votes_text[nein_idx + len("gegen die Stimmen"):end_nein] result["nein"] = _normalize_fraktionen_bt(nein_text) if enth_idx >= 0: enth_text = votes_text[enth_idx + len("bei Enthaltung"):] result["enthaltung"] = _normalize_fraktionen_bt(enth_text) return result # Drucksache-Pattern fuer rueckwaerts-Lookup: "Drucksache 20/123" oder # "auf Drucksache 20/123(neu)" — nehmen die letzten 1500 Zeichen vor dem # Anchor. DS_RE_BT = re.compile(r"Drucksache\s+(\d{1,2}/\d{2,5}(?:\(neu\))?)") def _resolve_drucksache_bt(text: str, anchor_start: int) -> Optional[str]: """Rueckwaerts vom Anchor die letzte erwaehnte Drucksache finden.""" window_start = max(0, anchor_start - 1500) window = text[window_start:anchor_start] matches = list(DS_RE_BT.finditer(window)) if matches: return matches[-1].group(1) return None def _extract_full_text(xml_path: str) -> str: """Extrahiere den Volltext aus einem BT-Plenarprotokoll-XML.""" tree = ET.parse(xml_path) text = ET.tostring(tree.getroot(), encoding="unicode", method="text") # Whitespace normalisieren: alles auf Single-Space, wie im NRW-Parser text = re.sub(r"\s+", " ", text) return text def parse_protocol(xml_path: str) -> list[dict]: """Parst ein Bundestags-Plenarprotokoll-XML und liefert Vote-Records.""" text = _extract_full_text(xml_path) results = [] for m in RESULT_ANCHOR_RE.finditer(text): subject = m.group("subject") ergebnis = m.group("ergebnis") # angenommen | abgelehnt votes_text = m.group("votes") ds = _resolve_drucksache_bt(text, m.start()) if not ds: continue votes = _parse_vote_block_bt(votes_text) # einstimmig-Heuristik: alle 6 BT-Fraktionen in ja, nichts in nein/enth einstimmig = ( len(votes["ja"]) >= 5 # mind. 5 von 6 → praktisch einstimmig and not votes["nein"] and not votes["enthaltung"] ) # Subjekt → kind-Klassifikation if "Überweisungsvorschlag" in subject: kind = "ueberweisung" # Ueberweisungen sind typischerweise faktisch ergebnis="ueberwiesen" ergebnis = "überwiesen" if ergebnis == "angenommen" else ergebnis elif "Gesetzentwurf" in subject: kind = "gesetzentwurf" else: kind = "direct" results.append({ "drucksache": ds, "ergebnis": ergebnis, "einstimmig": einstimmig, "kind": kind, "votes": votes, "anchor_pos": m.start(), }) # Dedup ueber (drucksache, anchor_pos): falls ein Anchor mehrfach matched seen = set() deduped = [] for r in results: key = (r["drucksache"], r["anchor_pos"]) if key in seen: continue seen.add(key) deduped.append(r) return deduped