"""Adapter für abgeordnetenwatch.de API v2 (#106 Phase 1). Liefert strukturierte Abstimmungsdaten (namentliche Abstimmungen) pro Bundesland + Bundestag. Daten werden lokal in abgeordnetenwatch_polls und abgeordnetenwatch_votes gecacht. API-Docs: https://www.abgeordnetenwatch.de/api/v2 """ from __future__ import annotations import logging import re from typing import Optional import httpx logger = logging.getLogger(__name__) # Mapping unserer BL-Codes auf abgeordnetenwatch parliament-IDs. # IDs aus GET /api/v2/parliaments (Stand April 2026). PARLIAMENT_ID: dict[str, int] = { "BT": 5, # Bundestag (auch "BUND") "BUND": 5, # Alias "NRW": 4, "BE": 2, # Berlin "HH": 3, # Hamburg "BW": 6, # Baden-Württemberg "RP": 7, # Rheinland-Pfalz "LSA": 8, # Sachsen-Anhalt "MV": 9, # Mecklenburg-Vorpommern "HB": 10, # Bremen "HE": 11, # Hessen "NI": 12, # Niedersachsen "BY": 13, # Bayern "SL": 14, # Saarland "TH": 15, # Thüringen "BB": 16, # Brandenburg "SN": 17, # Sachsen "SH": 18, # Schleswig-Holstein } _BASE = "https://www.abgeordnetenwatch.de/api/v2" # Drucksachen-Extraktion aus field_intro-HTML — pro Landtag eigenes URL-/ # Dateinamen-Schema. Reihenfolge: erst Generic-Pattern "WP/NR" probieren # (BUND, HE), dann BL-spezifische Patterns aus den Drucksachen-PDF-URLs. _DS_PATTERNS: list[re.Pattern] = [ # Generic: "20/12345" — BUND, HE und ähnliche re.compile(r"\b(\d{1,2})/(\d{3,5})\b"), # NRW: MMD18-2142.pdf re.compile(r"MMD(\d{1,2})-(\d{3,5})\.pdf", re.IGNORECASE), # BE: d19-0564.pdf re.compile(r"/d(\d{1,2})-(\d{4})\.pdf", re.IGNORECASE), # BW: 17_7713_D.pdf re.compile(r"/(\d{1,2})_(\d{3,5})_D\.pdf", re.IGNORECASE), # HB: D21L0568.pdf (DL) re.compile(r"/D(\d{1,2})L(\d{3,5})\.pdf", re.IGNORECASE), # SH: drucksache-20-00187.pdf re.compile(r"drucksache-(\d{1,2})-(\d{3,5})\.pdf", re.IGNORECASE), # SL: Gs17_0503.pdf re.compile(r"/Gs(\d{1,2})_(\d{3,5})\.pdf", re.IGNORECASE), # LSA: wp8/drs/d0145… (Reihenfolge: wp dann nr) re.compile(r"/wp(\d{1,2})/drs/d(\d{3,5})", re.IGNORECASE), # SN: dok_nr=2150&...&leg_per=8 — params können in beliebiger Reihenfolge auftreten re.compile(r"dok_nr=(\d{3,5}).*leg_per=(\d{1,2})", re.IGNORECASE), # RP: 538-18.pdf (Reihenfolge: nr-wp) re.compile(r"/(\d{3,5})-(\d{1,2})\.pdf", re.IGNORECASE), ] def extract_drucksache_from_intro(html: str) -> Optional[str]: """Extrahiert die erste Drucksachen-Nummer aus dem field_intro-HTML. Probiert mehrere Landtags-spezifische URL-Patterns durch (NRW MMD-, BW __D.pdf, etc.) und gibt die erste Fundstelle als "/"-String zurück. Reihenfolge im Match-Tupel ist immer (wp, nr) — die Patterns selbst kümmern sich um eventuelle URL-Reihenfolgen-Eigenheiten (RP hat z.B. nr-wp, SN hat dok_nr=...&leg_per=..., dort drehen wir). """ if not html: return None for pat in _DS_PATTERNS: m = pat.search(html) if not m: continue # Spezialfall RP: nr-wp im URL → drehen, damit Output wp/nr if "-" in m.re.pattern and m.re.pattern.startswith("/(\\d{3,5})"): return f"{m.group(2)}/{m.group(1)}" # Spezialfall SN: dok_nr (Gruppe 1) + leg_per (Gruppe 2) → wp/nr if "dok_nr" in m.re.pattern: return f"{m.group(2)}/{m.group(1)}" # Standard: (wp, nr) return f"{m.group(1)}/{m.group(2)}" return None async def fallback_drucksache_by_date_title( datum: Optional[str], titel: Optional[str], bundesland: str, ) -> Optional[str]: """Fallback-Drucksachen-Lookup via Datum + Titel gegen die Assessments-DB. Wird aufgerufen wenn ``extract_drucksache_from_intro`` kein Pattern findet (betrifft MV/BY/BB/TH/HH/SL deren intro-HTML keine PDF-URLs enthält). Sucht Assessments für ``bundesland`` innerhalb von ±14 Tagen um ``datum`` und einem Titel-Substring-Match. Gibt die Drucksachen-Nummer des ersten Treffers zurück oder ``None``. Args: datum: ISO-Datum des Polls (``field_poll_date``, z.B. ``"2026-04-01"``). titel: Label/Titel des Polls (wird als LIKE-Substring geprüft). bundesland: Unser BL-Code (z.B. ``"MV"``). Returns: Drucksachen-Nummer als String (z.B. ``"7/1234"``) oder ``None``. """ if not datum or not titel: return None # Titel-Substring: nur die ersten 40 Zeichen für den LIKE-Match verwenden, # da Poll-Labels und Assessment-Titel leicht voneinander abweichen können. titel_substr = titel.strip()[:40] from .config import settings as _settings import aiosqlite as _aio async with _aio.connect(_settings.db_path) as db: cur = await db.execute( """ SELECT drucksache FROM assessments WHERE bundesland = ? AND ABS(julianday(datum) - julianday(?)) < 14 AND LOWER(title) LIKE ? ORDER BY ABS(julianday(datum) - julianday(?)) LIMIT 1 """, (bundesland.upper(), datum, f"%{titel_substr.lower()}%", datum), ) row = await cur.fetchone() if row: logger.debug( "fallback_drucksache_by_date_title: %s/%s → %s", bundesland, datum, row[0], ) return row[0] return None async def fetch_polls(bundesland_code: str, limit: int = 100) -> list[dict]: """Holt aktuelle Abstimmungen für ein Bundesland von abgeordnetenwatch. Gibt eine Liste von Poll-Dicts zurück; jedes Dict enthält zusätzlich den geparsten Key ``drucksache`` (kann None sein). Args: bundesland_code: Unser BL-Code (z.B. "NRW", "BT", "BUND"). limit: Maximale Anzahl Polls; wird als range_end übergeben. Returns: Liste von Poll-Dicts mit den Feldern aus der API plus ``drucksache``. Raises: ValueError: Wenn der bundesland_code nicht in PARLIAMENT_ID ist. httpx.HTTPError: Bei Netzwerkproblemen. """ parliament_id = PARLIAMENT_ID.get(bundesland_code.upper()) if parliament_id is None: raise ValueError( f"Unbekannter BL-Code '{bundesland_code}'. " f"Bekannte Codes: {sorted(PARLIAMENT_ID.keys())}" ) async with httpx.AsyncClient(timeout=30.0) as client: # Zuerst aktuellen ParliamentPeriod für das Parlament holen — # /polls filtert nach field_legislature (period-id), NICHT parliament-id. pp_resp = await client.get( f"{_BASE}/parliament-periods", params={"parliament": parliament_id, "type": "legislature", "range_end": 5}, ) pp_resp.raise_for_status() periods = (pp_resp.json() or {}).get("data") or [] # Aktuelle Periode: sortiere nach start-date desc, nimm die neueste current = sorted( periods, key=lambda x: x.get("start_date_period") or "", reverse=True, ) if not current: logger.warning("Keine ParliamentPeriod für %s (parliament_id=%d)", bundesland_code, parliament_id) return [] period_id = current[0]["id"] # Polls für diese Periode resp = await client.get( f"{_BASE}/polls", params={"field_legislature": period_id, "range_end": limit}, ) resp.raise_for_status() data = resp.json() polls_raw: list[dict] = data.get("data") or [] polls = [] for p in polls_raw: intro_html = p.get("field_intro") or "" polls.append({ "id": p.get("id"), "label": p.get("label") or p.get("field_poll_date", ""), "field_poll_date": p.get("field_poll_date"), "field_accepted": p.get("field_accepted"), "field_topics": p.get("field_topics") or [], "field_intro": intro_html, "field_legislature": p.get("field_legislature") or {}, "drucksache": extract_drucksache_from_intro(intro_html), }) logger.info( "abgeordnetenwatch: %d polls für %s (parliament_id=%d)", len(polls), bundesland_code, parliament_id, ) return polls async def fetch_votes_for_poll(poll_id: int) -> list[dict]: """Holt namentliche Einzelstimmen für eine Abstimmung. Args: poll_id: ID der Abstimmung (aus polls[].id). Returns: Liste von Vote-Dicts mit den Feldern: poll_id, politician_id, politician_name, partei, vote. vote ist einer von: "yes", "no", "abstain", "no_show". Raises: httpx.HTTPError: Bei Netzwerkproblemen. """ # /votes?poll=X funktioniert (empirisch ermittelt); # NICHT field_poll (500) und NICHT /polls/{id}?related_data=votes # (liefert leeres related_data). Einfaches ?poll=. url = f"{_BASE}/votes" params = {"poll": poll_id, "range_end": 1000} async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.get(url, params=params) resp.raise_for_status() data = resp.json() votes_raw: list[dict] = data.get("data") or [] votes = [] for v in votes_raw: politician = v.get("mandate") or v.get("politician") or {} politician_id = politician.get("id") or v.get("mandate_id") politician_name = politician.get("label") or politician.get("name") or "" # Partei aus politician.party oder fraction partei = "" party = politician.get("party") or {} if isinstance(party, dict): partei = party.get("label") or party.get("short_label") or "" fraction = v.get("fraction") or {} if not partei and isinstance(fraction, dict): partei = fraction.get("full_name") or fraction.get("label") or "" vote_value = (v.get("vote") or "").lower() # API liefert "yes"/"no"/"abstain"/"no_show" — direkt übernehmen if vote_value not in ("yes", "no", "abstain", "no_show"): vote_value = "no_show" votes.append({ "poll_id": poll_id, "politician_id": politician_id, "politician_name": politician_name, "partei": partei, "vote": vote_value, }) logger.info( "abgeordnetenwatch: %d votes für poll_id=%d", len(votes), poll_id ) return votes