gwoe-antragspruefer/app/protokoll_parsers/bund.py

203 lines
6.8 KiB
Python
Raw Normal View History

"""Bundestag (BUND) — Plenarprotokoll-Parser (#106 / #148, ADR 0009).
XML-basierter Parser für Bundestags-Plenarprotokolle. Quelle:
``https://dserver.bundestag.de/btp/{wp}/{wp}{n:03}.xml`` (auch .pdf
verfuegbar; XML ist strukturierter, daher bevorzugt).
## Anchor-Sprache (verifiziert WP20 Sitzungen 30, 100)
Bundestag formuliert Beschluesse mit:
```
Die Beschlussempfehlung ist mit den Stimmen der Koalitionsfraktionen
und der Fraktion Die Linke gegen die Stimmen der CDU/CSU-Fraktion
bei Enthaltung der AfD-Fraktion angenommen.
```
Pattern:
- Subjekt: "Die Beschlussempfehlung", "Der Überweisungsvorschlag",
"Der Antrag", "Der Gesetzentwurf"
- Vote-Block: "mit den Stimmen X gegen die Stimmen Y bei Enthaltung Z"
- Anchor-Verb: "angenommen" oder "abgelehnt"
## Fraktions-Mapping
Koalitions-/Oppositions-Bezeichnungen aendern sich pro Wahlperiode.
Aktuell hardcoded fuer **WP20** (2021-2025, Ampel):
- "Koalitionsfraktionen" SPD + GRÜNE + FDP
- "Oppositionsfraktionen" CDU/CSU + AfD + LINKE
WP21 (ab 2025) wuerde anderes Mapping brauchen. Folge-Issue notwendig.
## Drucksachen-Aufloesung
Vor dem Anchor wird rueckwaerts nach "Drucksache 20/N" oder
"auf Drucksache 20/N" gesucht. Der naechste Match in einem 1500-Zeichen-
Fenster gewinnt.
"""
from __future__ import annotations
import re
import xml.etree.ElementTree as ET
from typing import Optional
# WP20 (2021-2025) Koalition: SPD + GRÜNE + FDP. Opposition: CDU/CSU + AfD + LINKE.
# WP21 Implementierung erfordert separates Mapping pro WP — folgt sobald gebraucht.
WP20_KOALITIONSFRAKTIONEN = ["SPD", "GRÜNE", "FDP"]
WP20_OPPOSITIONSFRAKTIONEN = ["CDU/CSU", "AfD", "LINKE"]
# Phrase → kanonische Fraktions-Codes. Reihenfolge: längere Aliasse zuerst.
FRAKTIONEN_MAP_BT = [
("Koalitionsfraktionen", WP20_KOALITIONSFRAKTIONEN),
("Koalitionsfraktion", WP20_KOALITIONSFRAKTIONEN),
("Oppositionsfraktionen", WP20_OPPOSITIONSFRAKTIONEN),
("Oppositionsfraktion", WP20_OPPOSITIONSFRAKTIONEN),
("Fraktion Bündnis 90/Die Grünen", ["GRÜNE"]),
("Bündnis 90/Die Grünen", ["GRÜNE"]),
("Fraktion Die Linke", ["LINKE"]),
("Die Linke", ["LINKE"]),
("CDU/CSU-Fraktion", ["CDU/CSU"]),
("Fraktion der CDU/CSU", ["CDU/CSU"]),
("CDU/CSU", ["CDU/CSU"]),
("SPD-Fraktion", ["SPD"]),
("Fraktion der SPD", ["SPD"]),
("SPD", ["SPD"]),
("FDP-Fraktion", ["FDP"]),
("Fraktion der FDP", ["FDP"]),
("FDP", ["FDP"]),
("AfD-Fraktion", ["AfD"]),
("Fraktion der AfD", ["AfD"]),
("AfD", ["AfD"]),
]
ALL_BT_FRAKTIONEN = ["CDU/CSU", "SPD", "GRÜNE", "FDP", "AfD", "LINKE"]
def _normalize_fraktionen_bt(text: str) -> list[str]:
"""Extrahiere BT-Fraktions-Codes aus einer Phrase."""
found = set()
remaining = text
for phrase, codes in FRAKTIONEN_MAP_BT:
if phrase in remaining:
for c in codes:
found.add(c)
remaining = remaining.replace(phrase, " ")
return sorted(found)
# Result-Anchor: Subjekt + "ist mit den Stimmen [...] (angenommen|abgelehnt)"
# Großzügige 500-char-Begrenzung weil BT-Vote-Blocks lang werden koennen.
RESULT_ANCHOR_RE = re.compile(
r"(?P<subject>Die Beschlussempfehlung|Der Überweisungsvorschlag|Der Antrag"
r"|Der Gesetzentwurf|Diese Beschlussempfehlung)"
r"\s+ist\s+mit den Stimmen(?P<votes>[^.]{20,500}?)"
r"\s+(?P<ergebnis>angenommen|abgelehnt)\s*\.",
re.DOTALL,
)
def _parse_vote_block_bt(votes_text: str) -> dict:
"""Parst BT-Vote-Phrase: 'X gegen die Stimmen Y bei Enthaltung Z'."""
result = {"ja": [], "nein": [], "enthaltung": []}
# Aufsplit-Marker
nein_idx = votes_text.find("gegen die Stimmen")
enth_idx = votes_text.find("bei Enthaltung")
# Boundaries
end_ja = min(idx for idx in (nein_idx, enth_idx, len(votes_text)) if idx >= 0)
ja_text = votes_text[:end_ja]
result["ja"] = _normalize_fraktionen_bt(ja_text)
if nein_idx >= 0:
end_nein = enth_idx if enth_idx > nein_idx else len(votes_text)
nein_text = votes_text[nein_idx + len("gegen die Stimmen"):end_nein]
result["nein"] = _normalize_fraktionen_bt(nein_text)
if enth_idx >= 0:
enth_text = votes_text[enth_idx + len("bei Enthaltung"):]
result["enthaltung"] = _normalize_fraktionen_bt(enth_text)
return result
# Drucksache-Pattern fuer rueckwaerts-Lookup: "Drucksache 20/123" oder
# "auf Drucksache 20/123(neu)" — nehmen die letzten 1500 Zeichen vor dem
# Anchor.
DS_RE_BT = re.compile(r"Drucksache\s+(\d{1,2}/\d{2,5}(?:\(neu\))?)")
def _resolve_drucksache_bt(text: str, anchor_start: int) -> Optional[str]:
"""Rueckwaerts vom Anchor die letzte erwaehnte Drucksache finden."""
window_start = max(0, anchor_start - 1500)
window = text[window_start:anchor_start]
matches = list(DS_RE_BT.finditer(window))
if matches:
return matches[-1].group(1)
return None
def _extract_full_text(xml_path: str) -> str:
"""Extrahiere den Volltext aus einem BT-Plenarprotokoll-XML."""
tree = ET.parse(xml_path)
text = ET.tostring(tree.getroot(), encoding="unicode", method="text")
# Whitespace normalisieren: alles auf Single-Space, wie im NRW-Parser
text = re.sub(r"\s+", " ", text)
return text
def parse_protocol(xml_path: str) -> list[dict]:
"""Parst ein Bundestags-Plenarprotokoll-XML und liefert Vote-Records."""
text = _extract_full_text(xml_path)
results = []
for m in RESULT_ANCHOR_RE.finditer(text):
subject = m.group("subject")
ergebnis = m.group("ergebnis") # angenommen | abgelehnt
votes_text = m.group("votes")
ds = _resolve_drucksache_bt(text, m.start())
if not ds:
continue
votes = _parse_vote_block_bt(votes_text)
# einstimmig-Heuristik: alle 6 BT-Fraktionen in ja, nichts in nein/enth
einstimmig = (
len(votes["ja"]) >= 5 # mind. 5 von 6 → praktisch einstimmig
and not votes["nein"]
and not votes["enthaltung"]
)
# Subjekt → kind-Klassifikation
if "Überweisungsvorschlag" in subject:
kind = "ueberweisung"
# Ueberweisungen sind typischerweise faktisch ergebnis="ueberwiesen"
ergebnis = "überwiesen" if ergebnis == "angenommen" else ergebnis
elif "Gesetzentwurf" in subject:
kind = "gesetzentwurf"
else:
kind = "direct"
results.append({
"drucksache": ds,
"ergebnis": ergebnis,
"einstimmig": einstimmig,
"kind": kind,
"votes": votes,
"anchor_pos": m.start(),
})
# Dedup ueber (drucksache, anchor_pos): falls ein Anchor mehrfach matched
seen = set()
deduped = []
for r in results:
key = (r["drucksache"], r["anchor_pos"])
if key in seen:
continue
seen.add(key)
deduped.append(r)
return deduped