gwoe-antragspruefer/app/abgeordnetenwatch.py

286 lines
10 KiB
Python
Raw Normal View History

"""Adapter für abgeordnetenwatch.de API v2 (#106 Phase 1).
Liefert strukturierte Abstimmungsdaten (namentliche Abstimmungen)
pro Bundesland + Bundestag. Daten werden lokal in abgeordnetenwatch_polls
und abgeordnetenwatch_votes gecacht.
API-Docs: https://www.abgeordnetenwatch.de/api/v2
"""
from __future__ import annotations
import logging
import re
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
# Mapping unserer BL-Codes auf abgeordnetenwatch parliament-IDs.
# IDs aus GET /api/v2/parliaments (Stand April 2026).
PARLIAMENT_ID: dict[str, int] = {
"BT": 5, # Bundestag (auch "BUND")
"BUND": 5, # Alias
"NRW": 4,
"BE": 2, # Berlin
"HH": 3, # Hamburg
"BW": 6, # Baden-Württemberg
"RP": 7, # Rheinland-Pfalz
"LSA": 8, # Sachsen-Anhalt
"MV": 9, # Mecklenburg-Vorpommern
"HB": 10, # Bremen
"HE": 11, # Hessen
"NI": 12, # Niedersachsen
"BY": 13, # Bayern
"SL": 14, # Saarland
"TH": 15, # Thüringen
"BB": 16, # Brandenburg
"SN": 17, # Sachsen
"SH": 18, # Schleswig-Holstein
}
_BASE = "https://www.abgeordnetenwatch.de/api/v2"
# Drucksachen-Extraktion aus field_intro-HTML — pro Landtag eigenes URL-/
# Dateinamen-Schema. Reihenfolge: erst Generic-Pattern "WP/NR" probieren
# (BUND, HE), dann BL-spezifische Patterns aus den Drucksachen-PDF-URLs.
_DS_PATTERNS: list[re.Pattern] = [
# Generic: "20/12345" — BUND, HE und ähnliche
re.compile(r"\b(\d{1,2})/(\d{3,5})\b"),
# NRW: MMD18-2142.pdf
re.compile(r"MMD(\d{1,2})-(\d{3,5})\.pdf", re.IGNORECASE),
# BE: d19-0564.pdf
re.compile(r"/d(\d{1,2})-(\d{4})\.pdf", re.IGNORECASE),
# BW: 17_7713_D.pdf
re.compile(r"/(\d{1,2})_(\d{3,5})_D\.pdf", re.IGNORECASE),
# HB: D21L0568.pdf (D<wp>L<nr>)
re.compile(r"/D(\d{1,2})L(\d{3,5})\.pdf", re.IGNORECASE),
# SH: drucksache-20-00187.pdf
re.compile(r"drucksache-(\d{1,2})-(\d{3,5})\.pdf", re.IGNORECASE),
# SL: Gs17_0503.pdf
re.compile(r"/Gs(\d{1,2})_(\d{3,5})\.pdf", re.IGNORECASE),
# LSA: wp8/drs/d0145… (Reihenfolge: wp dann nr)
re.compile(r"/wp(\d{1,2})/drs/d(\d{3,5})", re.IGNORECASE),
# SN: dok_nr=2150&...&leg_per=8 — params können in beliebiger Reihenfolge auftreten
re.compile(r"dok_nr=(\d{3,5}).*leg_per=(\d{1,2})", re.IGNORECASE),
# RP: 538-18.pdf (Reihenfolge: nr-wp)
re.compile(r"/(\d{3,5})-(\d{1,2})\.pdf", re.IGNORECASE),
]
def extract_drucksache_from_intro(html: str) -> Optional[str]:
"""Extrahiert die erste Drucksachen-Nummer aus dem field_intro-HTML.
Probiert mehrere Landtags-spezifische URL-Patterns durch (NRW MMD<wp>-<nr>,
BW <wp>_<nr>_D.pdf, etc.) und gibt die erste Fundstelle als
"<wp>/<nr>"-String zurück. Reihenfolge im Match-Tupel ist immer (wp, nr)
die Patterns selbst kümmern sich um eventuelle URL-Reihenfolgen-Eigenheiten
(RP hat z.B. nr-wp, SN hat dok_nr=...&leg_per=..., dort drehen wir).
"""
if not html:
return None
for pat in _DS_PATTERNS:
m = pat.search(html)
if not m:
continue
# Spezialfall RP: nr-wp im URL → drehen, damit Output wp/nr
if "-" in m.re.pattern and m.re.pattern.startswith("/(\\d{3,5})"):
return f"{m.group(2)}/{m.group(1)}"
# Spezialfall SN: dok_nr (Gruppe 1) + leg_per (Gruppe 2) → wp/nr
if "dok_nr" in m.re.pattern:
return f"{m.group(2)}/{m.group(1)}"
# Standard: (wp, nr)
return f"{m.group(1)}/{m.group(2)}"
return None
async def fallback_drucksache_by_date_title(
datum: Optional[str],
titel: Optional[str],
bundesland: str,
) -> Optional[str]:
"""Fallback-Drucksachen-Lookup via Datum + Titel gegen die Assessments-DB.
Wird aufgerufen wenn ``extract_drucksache_from_intro`` kein Pattern findet
(betrifft MV/BY/BB/TH/HH/SL deren intro-HTML keine PDF-URLs enthält).
Sucht Assessments für ``bundesland`` innerhalb von ±14 Tagen um ``datum``
und einem Titel-Substring-Match. Gibt die Drucksachen-Nummer des ersten
Treffers zurück oder ``None``.
Args:
datum: ISO-Datum des Polls (``field_poll_date``, z.B. ``"2026-04-01"``).
titel: Label/Titel des Polls (wird als LIKE-Substring geprüft).
bundesland: Unser BL-Code (z.B. ``"MV"``).
Returns:
Drucksachen-Nummer als String (z.B. ``"7/1234"``) oder ``None``.
"""
if not datum or not titel:
return None
# Titel-Substring: nur die ersten 40 Zeichen für den LIKE-Match verwenden,
# da Poll-Labels und Assessment-Titel leicht voneinander abweichen können.
titel_substr = titel.strip()[:40]
from .config import settings as _settings
import aiosqlite as _aio
async with _aio.connect(_settings.db_path) as db:
cur = await db.execute(
"""
SELECT drucksache FROM assessments
WHERE bundesland = ?
AND ABS(julianday(datum) - julianday(?)) < 14
AND LOWER(title) LIKE ?
ORDER BY ABS(julianday(datum) - julianday(?))
LIMIT 1
""",
(bundesland.upper(), datum, f"%{titel_substr.lower()}%", datum),
)
row = await cur.fetchone()
if row:
logger.debug(
"fallback_drucksache_by_date_title: %s/%s%s",
bundesland, datum, row[0],
)
return row[0]
return None
async def fetch_polls(bundesland_code: str, limit: int = 100) -> list[dict]:
"""Holt aktuelle Abstimmungen für ein Bundesland von abgeordnetenwatch.
Gibt eine Liste von Poll-Dicts zurück; jedes Dict enthält zusätzlich
den geparsten Key ``drucksache`` (kann None sein).
Args:
bundesland_code: Unser BL-Code (z.B. "NRW", "BT", "BUND").
limit: Maximale Anzahl Polls; wird als range_end übergeben.
Returns:
Liste von Poll-Dicts mit den Feldern aus der API plus ``drucksache``.
Raises:
ValueError: Wenn der bundesland_code nicht in PARLIAMENT_ID ist.
httpx.HTTPError: Bei Netzwerkproblemen.
"""
parliament_id = PARLIAMENT_ID.get(bundesland_code.upper())
if parliament_id is None:
raise ValueError(
f"Unbekannter BL-Code '{bundesland_code}'. "
f"Bekannte Codes: {sorted(PARLIAMENT_ID.keys())}"
)
async with httpx.AsyncClient(timeout=30.0) as client:
# Zuerst aktuellen ParliamentPeriod für das Parlament holen —
# /polls filtert nach field_legislature (period-id), NICHT parliament-id.
pp_resp = await client.get(
f"{_BASE}/parliament-periods",
params={"parliament": parliament_id, "type": "legislature", "range_end": 5},
)
pp_resp.raise_for_status()
periods = (pp_resp.json() or {}).get("data") or []
# Aktuelle Periode: sortiere nach start-date desc, nimm die neueste
current = sorted(
periods,
key=lambda x: x.get("start_date_period") or "",
reverse=True,
)
if not current:
logger.warning("Keine ParliamentPeriod für %s (parliament_id=%d)",
bundesland_code, parliament_id)
return []
period_id = current[0]["id"]
# Polls für diese Periode
resp = await client.get(
f"{_BASE}/polls",
params={"field_legislature": period_id, "range_end": limit},
)
resp.raise_for_status()
data = resp.json()
polls_raw: list[dict] = data.get("data") or []
polls = []
for p in polls_raw:
intro_html = p.get("field_intro") or ""
polls.append({
"id": p.get("id"),
"label": p.get("label") or p.get("field_poll_date", ""),
"field_poll_date": p.get("field_poll_date"),
"field_accepted": p.get("field_accepted"),
"field_topics": p.get("field_topics") or [],
"field_intro": intro_html,
"field_legislature": p.get("field_legislature") or {},
"drucksache": extract_drucksache_from_intro(intro_html),
})
logger.info(
"abgeordnetenwatch: %d polls für %s (parliament_id=%d)",
len(polls), bundesland_code, parliament_id,
)
return polls
async def fetch_votes_for_poll(poll_id: int) -> list[dict]:
"""Holt namentliche Einzelstimmen für eine Abstimmung.
Args:
poll_id: ID der Abstimmung (aus polls[].id).
Returns:
Liste von Vote-Dicts mit den Feldern:
poll_id, politician_id, politician_name, partei, vote.
vote ist einer von: "yes", "no", "abstain", "no_show".
Raises:
httpx.HTTPError: Bei Netzwerkproblemen.
"""
# /votes?poll=X funktioniert (empirisch ermittelt);
# NICHT field_poll (500) und NICHT /polls/{id}?related_data=votes
# (liefert leeres related_data). Einfaches ?poll=<id>.
url = f"{_BASE}/votes"
params = {"poll": poll_id, "range_end": 1000}
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json()
votes_raw: list[dict] = data.get("data") or []
votes = []
for v in votes_raw:
politician = v.get("mandate") or v.get("politician") or {}
politician_id = politician.get("id") or v.get("mandate_id")
politician_name = politician.get("label") or politician.get("name") or ""
# Partei aus politician.party oder fraction
partei = ""
party = politician.get("party") or {}
if isinstance(party, dict):
partei = party.get("label") or party.get("short_label") or ""
fraction = v.get("fraction") or {}
if not partei and isinstance(fraction, dict):
partei = fraction.get("full_name") or fraction.get("label") or ""
vote_value = (v.get("vote") or "").lower()
# API liefert "yes"/"no"/"abstain"/"no_show" — direkt übernehmen
if vote_value not in ("yes", "no", "abstain", "no_show"):
vote_value = "no_show"
votes.append({
"poll_id": poll_id,
"politician_id": politician_id,
"politician_name": politician_name,
"partei": partei,
"vote": vote_value,
})
logger.info(
"abgeordnetenwatch: %d votes für poll_id=%d", len(votes), poll_id
)
return votes