"""Parliament search adapters for different German states.""" import json import logging import httpx import re from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Optional from bs4 import BeautifulSoup logger = logging.getLogger(__name__) @dataclass class Drucksache: """A parliamentary document.""" drucksache: str # e.g. "18/8125" title: str fraktionen: list[str] datum: str # ISO date link: str # PDF URL bundesland: str typ: str = "Antrag" # Antrag, Anfrage, Beschlussempfehlung, etc. class ParlamentAdapter(ABC): """Base adapter for searching parliament documents.""" bundesland: str name: str @abstractmethod async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search for documents matching query.""" pass @abstractmethod async def get_document(self, drucksache: str) -> Optional[Drucksache]: """Get a specific document by ID.""" pass @abstractmethod async def download_text(self, drucksache: str) -> Optional[str]: """Download and extract text from a document.""" pass class NRWAdapter(ParlamentAdapter): """Adapter for NRW Landtag (opal.landtag.nrw.de).""" bundesland = "NRW" name = "Landtag Nordrhein-Westfalen" base_url = "https://opal.landtag.nrw.de" search_url = "https://opal.landtag.nrw.de/home/dokumente/dokumentensuche/parlamentsdokumente/aktuelle-dokumente.html" def _parse_query(self, query: str) -> tuple[str, list[str], bool]: """ Parse search query for AND logic and exact phrases. Returns: (search_term_for_api, filter_terms, is_exact) Examples: - 'Klimaschutz Energie' -> ('Klimaschutz', ['klimaschutz', 'energie'], False) - '"Grüner Stahl"' -> ('Grüner Stahl', ['grüner stahl'], True) - 'Klimaschutz "erneuerbare Energie"' -> ('Klimaschutz', ['klimaschutz', 'erneuerbare energie'], False) """ query = query.strip() # Check for exact phrase (entire query in quotes) if query.startswith('"') and query.endswith('"') and query.count('"') == 2: exact = query[1:-1].strip() return (exact, [exact.lower()], True) # Extract quoted phrases and regular terms import shlex try: parts = shlex.split(query) except ValueError: # Fallback for unbalanced quotes parts = query.split() if not parts: return (query, [query.lower()], False) # Use first term for API search, all terms for filtering filter_terms = [p.lower() for p in parts] return (parts[0], filter_terms, False) def _matches_all_terms(self, doc: 'Drucksache', terms: list[str], is_exact: bool) -> bool: """Check if document matches all search terms (AND logic).""" searchable = f"{doc.title} {doc.drucksache} {' '.join(doc.fraktionen)} {doc.typ}".lower() if is_exact: # Exact phrase must appear return terms[0] in searchable else: # All terms must appear (AND) return all(term in searchable for term in terms) async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search NRW Landtag documents via OPAL portal.""" results = [] # Parse query for AND logic api_query, filter_terms, is_exact = self._parse_query(query) async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: try: # First, get the page to establish session initial = await client.get(self.search_url) if initial.status_code != 200: print(f"NRW search initial request failed: {initial.status_code}") return [] # Parse for webflow token from pagination links soup = BeautifulSoup(initial.text, 'html.parser') # Find a pagination link to extract the webflow token pagination_link = soup.select_one('a[href*="webflowexecution"]') webflow_token = "" webflow_execution = "" if pagination_link: href = pagination_link.get('href', '') # Extract webflowToken and webflowexecution from URL token_match = re.search(r'webflowToken=([^&]*)', href) exec_match = re.search(r'(webflowexecution[^=]+)=([^&]+)', href) if token_match: webflow_token = token_match.group(1) if exec_match: webflow_execution = f"{exec_match.group(1)}={exec_match.group(2)}" # Now perform the search with POST # Find the form action URL with webflow token form = soup.select_one('form#docSearchByItem') form_action = self.search_url if form and form.get('action'): action = form.get('action') if action.startswith('/'): form_action = f"{self.base_url}{action}" elif action.startswith('http'): form_action = action else: form_action = f"{self.search_url}?{action}" # Build form data for "Einfache Suche" (searchByItem form) form_data = { '_eventId_sendform': '1', 'dokNum': api_query, # This is the text search field 'formId': 'searchByItem', 'dokTyp': '', # All types 'wp': '18', # Wahlperiode 18 } # POST request with form data to the form action URL search_resp = await client.post( form_action, data=form_data, cookies=initial.cookies, headers={'Content-Type': 'application/x-www-form-urlencoded'} ) if search_resp.status_code != 200: print(f"NRW search request failed: {search_resp.status_code}") return [] # Parse results soup = BeautifulSoup(search_resp.text, 'html.parser') # Find all document result items (li elements containing articles) items = soup.select('li:has(article)') for item in items[:limit]: try: # Extract drucksache number from first link num_link = item.select_one('a[href*="MMD"]') if not num_link: continue href = num_link.get('href', '') # Extract number: MMD18-12345.pdf -> 18/12345 match = re.search(r'MMD(\d+)-(\d+)\.pdf', href) if not match: continue legislatur, nummer = match.groups() drucksache = f"{legislatur}/{nummer}" pdf_url = f"https://www.landtag.nrw.de{href}" if href.startswith('/') else href # Extract title from the title link (class e-document-result-item__title) title_elem = item.select_one('a.e-document-result-item__title') if title_elem: # Get text content, clean it up title = title_elem.get_text(strip=True) # Remove SVG icon text and clean title = re.sub(r'\s* Optional[Drucksache]: """Get document metadata by drucksache ID (e.g. '18/8125').""" # Parse legislatur and number match = re.match(r"(\d+)/(\d+)", drucksache) if not match: return None legislatur, nummer = match.groups() pdf_url = f"https://www.landtag.nrw.de/portal/WWW/dokumentenarchiv/Dokument/MMD{legislatur}-{nummer}.pdf" # Try to fetch and extract basic info async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: try: resp = await client.head(pdf_url) if resp.status_code == 200: return Drucksache( drucksache=drucksache, title=f"Drucksache {drucksache}", fraktionen=[], datum="", link=pdf_url, bundesland="NRW", ) except: pass return None async def download_text(self, drucksache: str) -> Optional[str]: """Download PDF and extract text.""" import fitz # PyMuPDF doc = await self.get_document(drucksache) if not doc: return None async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client: try: resp = await client.get(doc.link) if resp.status_code != 200: return None # Extract text with PyMuPDF pdf = fitz.open(stream=resp.content, filetype="pdf") text = "" for page in pdf: text += page.get_text() pdf.close() return text except Exception as e: print(f"Error downloading {drucksache}: {e}") return None class PortalaAdapter(ParlamentAdapter): """Adapter for portala/eUI-based parliament documentation systems. Used by parliaments running the proprietary "esearch" / portala framework (originally developed for STAR/StarFinder backends, now wrapped in a Single-Page App with Template Toolkit on the server side): - **LSA** (Sachsen-Anhalt) — PADOKA at ``padoka.landtag.sachsen-anhalt.de`` under ``/portal/`` (singular) - **BE** (Berlin) — PARDOK at ``pardok.parlament-berlin.de`` under ``/portala/`` (with the trailing 'a') Both instances share the same JSON action schema, only the base URL, the data source ID, the application path prefix and a few minor quirks differ — those are constructor parameters so that the same class can serve both states (and any future portala-based parliament). The search workflow is two-stage: 1. ``POST {base}{path}/browse.tt.json`` with a complex JSON ``action`` body that contains an Elasticsearch-style query tree under ``search.json``. The server returns a ``report_id`` plus hit count. 2. ``POST {base}{path}/report.tt.html`` with ``{report_id, start, chunksize}`` to fetch the HTML hit list. Each hit carries a Perl Data::Dumper block in a ``
`` tag with the canonical metadata.

    The query body schema was reverse-engineered from
    https://github.com/okfde/dokukratie/blob/main/dokukratie/scrapers/portala.query.json
    (GPL-3.0 — only structure/selectors are reused, not Python code).

    Full-text search is **not** implemented in the MVP: the adapter
    returns documents of the current Wahlperiode in the given date
    window, and the search query is applied as a client-side
    title/Urheber filter. The server-side full-text path requires
    state-specific ``sf`` index names that are not yet known.
    """

    def __init__(
        self,
        *,
        bundesland: str,
        name: str,
        base_url: str,
        db_id: str,
        wahlperiode: int,
        portala_path: str = "/portal",
        document_type: Optional[str] = "Antrag",
        pdf_url_prefix: str = "/files/",
        date_window_days: int = 730,
        typ_filter: Optional[str] = "DOKDBE",
        omit_date_filter: bool = False,
    ) -> None:
        """Configure a portala/eUI adapter for one specific parliament.

        Args:
            bundesland: state code (e.g. ``"LSA"``, ``"BE"``).
            name: human-readable adapter label (used in logs/UI).
            base_url: ``https://...`` of the portal host without trailing slash.
            db_id: data source identifier the eUI server expects in
                ``action.sources``, e.g. ``"lsa.lissh"`` or ``"lah.lissh"``.
            wahlperiode: current legislative period — fed into the WP
                term of the search tree.
            portala_path: path prefix where the portala app lives. ``/portal``
                for LSA, ``/portala`` for Berlin.
            document_type: optional filter applied via ETYPF/DTYPF/DART
                terms. ``"Antrag"`` works for LSA; for instances where
                the index uses different document_type values (e.g. Berlin),
                pass ``None`` to drop the document_type subtree entirely
                — the user can still filter client-side by title.
            pdf_url_prefix: URL fragment between ``base_url`` and the
                relative PDF path returned by the server.
            date_window_days: how many days back ``search()`` looks by
                default.
            typ_filter: ``TYP=`` term in the parsed string and
                JSON tree. ``DOKDBE`` works for LSA/BE/BB/BW (the
                lissh-style instances). For Hessen (``hlt.lis``) and
                similar instances the value is different or absent —
                pass ``None`` to drop the term entirely.
        """
        self.bundesland = bundesland
        self.name = name
        self.base_url = base_url.rstrip("/")
        self.db_id = db_id
        self.wahlperiode = wahlperiode
        self.portala_path = "/" + portala_path.strip("/")
        self.document_type = document_type
        self.pdf_url_prefix = "/" + pdf_url_prefix.strip("/") + "/"
        self.date_window_days = date_window_days
        self.typ_filter = typ_filter
        self.omit_date_filter = omit_date_filter

    # ── LSA-style hit list (Perl Data::Dumper inside 
 blocks) ──
    # Reverse-engineered "WEV*" record fields:
    # WEV06.main = title
    # WEV32.5    = relative PDF path
    # WEV32.main = "Antrag   Drucksache X/YYYY ..."
    _RE_TITLE = re.compile(r"'WEV06'\s*=>\s*\[\s*\{\s*'main'\s*=>\s*[\"']([^\"']+)[\"']")
    _RE_PDF = re.compile(r"'5'\s*=>\s*'([^']*\.pdf)'")
    _RE_DRUCKSACHE = re.compile(r"Drucksache\s*(\d+/\d+)")
    _RE_URHEBER_DATUM = re.compile(
        r"'WEV32'\s*=>\s*\[\s*\{[^}]*'main'\s*=>\s*[\"']Antrag\s+(.+?)\s+(\d{1,2}\.\d{1,2}\.\d{4})\s+Drucksache",
    )
    _RE_PRE_BLOCK = re.compile(r'
\$VAR1 = (.*?)
', re.DOTALL) # ── Berlin-style hit list (production HTML cards, no Perl dump) ── # The whole div for one record: _RE_BE_RECORD = re.compile( r']*class="[^"]*efxRecordRepeater[^"]*"[^>]*data-efx-rec="[^"]*"[^>]*>(.*?)(?=]*efxRecordRepeater|]*id="efxResultsEnd"||$)', re.DOTALL, ) _RE_BE_TITLE = re.compile(r']*class="h5[^"]*"[^>]*>\s*([^<]+)') _RE_BE_LINK = re.compile(r']*href="([^"]+\.pdf)"[^>]*>') # The metadata h6 looks like: # Antrag (Eilantrag)  Drucksache 19/3104 S. 1 bis 24 vom 31.03.2026 _RE_BE_DRUCKSACHE = re.compile(r'Drucksache\s+(\d+/\d+)') # BE has "Drucksache 19/3104 S. 1 bis 24 vom 31.03.2026" — date is # marked by ``vom``. BB has the BE card format too but writes the # date BEFORE the Drucksachen-Nummer with no marker: # "Antrag Reinhard Simon (BSW) 17.10.2024 Drucksache 8/2 (1 S.)". # Try ``vom``-prefix first; fall back to the first plain date. _RE_BE_DATUM_VOM = re.compile(r'vom\s+(\d{1,2}\.\d{1,2}\.\d{4})') _RE_BE_DATUM_PLAIN = re.compile(r'(\d{1,2}\.\d{1,2}\.\d{4})') _RE_BE_DOCTYPE = re.compile(r'\s*([^<&]+?)(?: |<)') @staticmethod def _decode_perl_hex(s: str) -> str: """Decode \\x{abcd} escape sequences from Perl Data::Dumper output.""" return re.sub(r'\\x\{([0-9a-f]+)\}', lambda m: chr(int(m.group(1), 16)), s) def _normalize_fraktion(self, urheber: str) -> list[str]: """Thin shim — die ganze Regex-Logik lebt jetzt zentral in ``app.parteien.extract_fraktionen`` (siehe #55). ``self.bundesland`` wird mitgegeben, damit FW-Familien-Aliase korrekt disambiguiert werden. """ from .parteien import extract_fraktionen return extract_fraktionen(urheber, bundesland=self.bundesland) def _build_search_body( self, wahlperiode: int, start_date: str, end_date: str, ) -> dict: """Build the action JSON body for browse.tt.json. The schema is taken from dokukratie's portala.query.json template and only differs in the data source and the variable substitutions. When ``self.document_type`` is None, the ETYPF/DTYPF/DART subtree is dropped — useful for parliaments whose ETYPF index uses different value strings than ``"Antrag"``. """ document_type = self.document_type date_range_text = f"{start_date} THRU {end_date}" date_term = lambda sf, num: { # noqa: E731 — local helper "tn": "trange", "sf": sf, "op": "eq", "num": num, "idx": 119, "l": 3, "p1": start_date, "t1": start_date, "p2": end_date, "t2": end_date, "t": date_range_text, } # Build the search.lines (form-state mirror) and the json tree lines: dict = { "2": str(wahlperiode), "10": start_date, "11": end_date, "20.1": "alWEBBI", "20.2": "alWEBBI", "20.3": "alWEBBI", "90.1": "AND", "90.2": "AND", "90.3": "AND", } if document_type is not None: lines["3"] = document_type lines["4"] = "D" # Top-level AND tree top_terms: list = [ {"tn": "term", "t": str(wahlperiode), "idx": 6, "l": 3, "sf": "WP", "op": "eq", "num": 5}, ] if document_type is not None: top_terms.append({"tn": "or", "num": 3, "terms": [ {"tn": "or", "num": 4, "terms": [ {"tn": "term", "t": f'"{document_type}"', "idx": 50, "l": 4, "sf": "ETYPF", "op": "eq", "num": 10}, {"tn": "term", "t": f'"{document_type}"', "idx": 50, "l": 4, "sf": "ETYP2F", "op": "eq", "num": 11}, {"tn": "term", "t": f'"{document_type}"', "idx": 50, "l": 4, "sf": "DTYPF", "op": "eq", "num": 12}, {"tn": "term", "t": f'"{document_type}"', "idx": 50, "l": 4, "sf": "DTYP2F", "op": "eq", "num": 13}, {"tn": "term", "t": f'"{document_type}"', "idx": 50, "l": 4, "sf": "1VTYPF", "op": "eq", "num": 14}, ]}, {"tn": "or", "num": 15, "terms": [ {"tn": "term", "t": '"D"', "idx": 93, "l": 4, "sf": "DART", "op": "eq", "num": 16}, {"tn": "term", "t": '"D"', "idx": 93, "l": 4, "sf": "DARTS", "op": "eq", "num": 17}, ]}, ]}) if not self.omit_date_filter: top_terms.append({"tn": "or", "num": 18, "terms": [ {"tn": "or", "num": 19, "terms": [ date_term("DAT", 20), date_term("DDAT", 21), ]}, date_term("SDAT", 22), ]}) if self.typ_filter is not None: top_terms.append({"tn": "term", "t": self.typ_filter, "idx": 156, "l": 1, "sf": "TYP", "op": "eq", "num": 23}) # Mirror the same shape into the parsed/sref display strings typ_clause = f" AND TYP={self.typ_filter}" if self.typ_filter is not None else "" date_clause = ( f" AND (DAT,DDAT,SDAT= {date_range_text})" if not self.omit_date_filter else "" ) if document_type is not None: parsed = ( f"((/WP {wahlperiode}) AND " f"(/ETYPF,ETYP2F,DTYPF,DTYP2F,1VTYPF (\"{document_type}\")) " f"AND (/DART,DARTS (\"D\")){date_clause}){typ_clause}" ) else: parsed = f"((/WP {wahlperiode}){date_clause}){typ_clause}" return { "action": "SearchAndDisplay", "sources": [self.db_id], "report": { "rhl": "main", "rhlmode": "add", "format": "generic1-full", "mime": "html", "sort": "WEVSO1/D WEVSO2 WEVSO3", }, "search": { "lines": lines, "serverrecordname": "sr_generic1", "parsed": parsed, "sref": parsed, "json": [{ "tn": "and", "num": 1, "terms": top_terms, }], }, "dataSet": "1", } @staticmethod def _datum_de_to_iso(datum_de: str) -> str: """Convert DD.MM.YYYY → YYYY-MM-DD; return '' for empty input.""" if not datum_de: return "" d, m, y = datum_de.split(".") return f"{y}-{m.zfill(2)}-{d.zfill(2)}" def _parse_hit_list_html(self, html: str, query_filter: str = "") -> list[Drucksache]: """Extract Drucksachen from a report.tt.html response. Two formats are supported and auto-detected: - **LSA-style:** the records are embedded as Perl Data::Dumper dumps inside ``
$VAR1 = …
`` blocks. WEV06 → title, WEV32 → metadata + PDF path. Used by Sachsen-Anhalt's PADOKA template. - **Berlin-style:** standard production HTML cards with ``efxRecordRepeater`` divs. Title in an ``

``, metadata + PDF link in an ````. Used by Berlin's PARDOK template. """ if self._RE_PRE_BLOCK.search(html): return self._parse_hit_list_dump(html, query_filter) return self._parse_hit_list_cards(html, query_filter) def _parse_hit_list_dump(self, html: str, query_filter: str) -> list[Drucksache]: """Parse LSA-style ``
$VAR1 = …
`` Perl-dump records.""" results: list[Drucksache] = [] for pre in self._RE_PRE_BLOCK.findall(html): m_ds = self._RE_DRUCKSACHE.search(pre) if not m_ds: continue drucksache = m_ds.group(1) m_t = self._RE_TITLE.search(pre) title = self._decode_perl_hex(m_t.group(1)) if m_t else f"Drucksache {drucksache}" m_pdf = self._RE_PDF.search(pre) pdf_rel = m_pdf.group(1) if m_pdf else "" pdf_url = f"{self.base_url}{self.pdf_url_prefix}{pdf_rel}" if pdf_rel else "" m_w32 = self._RE_URHEBER_DATUM.search(pre) urheber = self._decode_perl_hex(m_w32.group(1).strip()) if m_w32 else "" datum_iso = self._datum_de_to_iso(m_w32.group(2) if m_w32 else "") fraktionen = self._normalize_fraktion(urheber) if urheber else [] doc = Drucksache( drucksache=drucksache, title=title, fraktionen=fraktionen, datum=datum_iso, link=pdf_url, bundesland=self.bundesland, typ="Antrag", ) if query_filter: hay = f"{title} {urheber}".lower() if not all(t in hay for t in query_filter.lower().split()): continue results.append(doc) return results def _parse_hit_list_cards(self, html: str, query_filter: str) -> list[Drucksache]: """Parse Berlin-style ``efxRecordRepeater`` HTML-card records. Each card contains an ``

`` title, a metadata ```` with the document type, the Drucksachen-Nummer, and the date, plus a direct ```` link to the PDF on the same host. """ results: list[Drucksache] = [] # Split the HTML on every record-div opener — easier than balancing # divs with regex. chunks = html.split('class="record') # First chunk is the prelude, skip it for chunk in chunks[1:]: # Each chunk now starts at the record class attribute m_t = self._RE_BE_TITLE.search(chunk) title = m_t.group(1).strip() if m_t else "Ohne Titel" m_ds = self._RE_BE_DRUCKSACHE.search(chunk) if not m_ds: continue drucksache = m_ds.group(1) m_pdf = self._RE_BE_LINK.search(chunk) pdf_url = "" if m_pdf: href = m_pdf.group(1) if href.startswith("http://") or href.startswith("https://"): pdf_url = href elif href.startswith("/"): pdf_url = f"{self.base_url}{href}" else: pdf_url = f"{self.base_url}{self.pdf_url_prefix}{href}" m_dat = self._RE_BE_DATUM_VOM.search(chunk) or self._RE_BE_DATUM_PLAIN.search(chunk) datum_iso = self._datum_de_to_iso(m_dat.group(1) if m_dat else "") m_doc = self._RE_BE_DOCTYPE.search(chunk) doctype_full = m_doc.group(1).strip() if m_doc else "Drucksache" # Berlin often packs the originator(s) into the same h6 line: # "Antrag CDU, SPD" → fraktionen = ["CDU","SPD"], typ = "Antrag" # Senat-Vorlagen carry no fraction, only "Vorlage zur …". fraktionen = self._normalize_fraktion(doctype_full) # Strip the fraction names back out of the typ string so the UI # shows a clean "Antrag" / "Vorlage …" label. typ = doctype_full if fraktionen: # Cut at the first occurrence of any party name cuts = [typ.upper().find(f.upper()) for f in fraktionen] cuts = [c for c in cuts if c >= 0] if cuts: typ = typ[: min(cuts)].rstrip(" ,") doc = Drucksache( drucksache=drucksache, title=title, fraktionen=fraktionen, datum=datum_iso, link=pdf_url, bundesland=self.bundesland, typ=typ, ) if query_filter: hay = f"{title} {doctype_full}".lower() if not all(t in hay for t in query_filter.lower().split()): continue results.append(doc) return results async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search recent documents of the current Wahlperiode. ``query`` is applied as a client-side title/Urheber filter; the server-side query covers the configured ``date_window_days`` (default 24 months). """ from datetime import date, timedelta end = date.today() start = end - timedelta(days=self.date_window_days) body = self._build_search_body( wahlperiode=self.wahlperiode, start_date=start.isoformat(), end_date=end.isoformat(), ) browse_html = f"{self.base_url}{self.portala_path}/browse.tt.html" browse_json = f"{self.base_url}{self.portala_path}/browse.tt.json" report_html = f"{self.base_url}{self.portala_path}/report.tt.html" async with httpx.AsyncClient( # Bumped from 30s for #13 quick-win: chunksize=500 against the # LSA report.tt.html endpoint occasionally takes 30+ seconds. timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: # Step 1: warm up cookies via the browse page await client.get(browse_html) # Step 2: submit the search action resp = await client.post( browse_json, json=body, headers={"Referer": browse_html}, ) if resp.status_code != 200: logger.error("%s search HTTP %s", self.bundesland, resp.status_code) return [] data = resp.json() report_id = data.get("report_id") if not report_id: logger.error("%s: no report_id in response: %s", self.bundesland, data) return [] # Step 3: fetch the HTML hit list # Take a generous chunk so der client-side type-filter # genug Material zum Filtern hat. Berlin-PARDOK ist # dominiert von "Schriftliche Anfrage"-Hits und ohne # server-side ETYPF-Filter (BE: document_type=None) liefern # 100 Roh-Hits oft nur 1-2 Anträge. Floor bewusst hoch. # Quick-win für #13 + #61 Bug 5. chunksize = max(limit * 10, 1500) report_resp = await client.post( report_html, json={"report_id": report_id, "start": 0, "chunksize": chunksize}, headers={"Referer": browse_html}, ) if report_resp.status_code != 200: logger.error("%s report HTTP %s", self.bundesland, report_resp.status_code) return [] results = self._parse_hit_list_html(report_resp.text, query_filter=query) # Server-side ETYPF/DTYPF filter is best-effort across portala # instances — BB/RP let "Kleine Anfrage" und "Beschluss- # empfehlung" durch, BE hat sogar `document_type=None` # (eigene ETYPF-Werte), wodurch "Schriftliche Anfrage" das # 200-Result-Window aushungern und Anträge wie 19/2650 nie # zurückkommen. Wir filtern client-side IMMER auf # "antrag"-Substring im typ — unabhängig davon, ob der # Server-Filter gesetzt war (siehe #61 Bug 2, 3, 5). results = [ d for d in results if "antrag" in (d.typ or "").lower() ] return results[:limit] except Exception: logger.exception("%s search error", self.bundesland) return [] async def get_document(self, drucksache: str) -> Optional[Drucksache]: """Look up a single document by ID via the search endpoint with a document_number filter.""" # Pragmatic MVP: do a broad search and filter for the requested ID. # A targeted single-document fetch would require a different # action.search.json structure that we have not reverse-engineered yet. results = await self.search(query="", limit=200) for doc in results: if doc.drucksache == drucksache: return doc return None async def download_text(self, drucksache: str) -> Optional[str]: """Download the PDF for a Drucksache and extract its text.""" import fitz # PyMuPDF doc = await self.get_document(drucksache) if not doc or not doc.link: return None async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: resp = await client.get(doc.link) if resp.status_code != 200: return None pdf = fitz.open(stream=resp.content, filetype="pdf") text = "" for page in pdf: text += page.get_text() pdf.close() return text except Exception: logger.exception("%s download error for %s", self.bundesland, drucksache) return None class ParLDokAdapter(ParlamentAdapter): """Adapter for ParlDok 8.x parliament documentation systems (J3S GmbH). ParlDok is a proprietary parliament documentation product by J3S GmbH (https://www.j3s.de). Different from the portala/eUI framework used by LSA/BE: ParlDok 8.x is a single-page app whose backend is a JSON API rooted at ``{base_url}{prefix}/Fulltext/...``. The legacy ParLDok 5.x HTML POST form (``parldok/formalkriterien``) used by dokukratie's MV YAML scraper has been deprecated by the LandtagMV upgrade to 8.3.5. Confirmed instances using this engine (April 2026): - **MV** (Mecklenburg-Vorpommern) — ``dokumentation.landtag-mv.de/parldok`` - HH, SN, TH all advertise ParlDok in dokukratie but their actual versions/themes have not been verified yet. Search workflow: 1. ``GET {base_url}{prefix}/`` to obtain the session cookie. The backend rejects POSTs without it. 2. ``POST {base_url}{prefix}/Fulltext/Search`` with form-encoded ``data=`` payload. The JSON carries a ``tags`` array of facet selections; each tag is ``{"type": , "id": }``. Reverse-engineered facet type constants from the bundle.js (``pd.facet_*``): - ``facet_fraction = 2`` - ``facet_kind = 7`` (Drucksache, Plenarprotokoll, …) - ``facet_type = 8`` (Antrag, Gesetzentwurf, Kleine Anfrage, …) - ``facet_lp = 10`` (Wahlperiode) Response is JSON ``{success, data: }`` where the inner ``data`` carries ``{count, docs: [{id, title, date, authorhtml, kind, type, lp, number, link, ...}], ...}``. 3. PDF download: ``GET {base_url}{prefix}/dokument/{numeric_id}``. Returns ``application/pdf`` directly. The ``link`` field returned by the search API already contains the path fragment ``/dokument/#navpanes=0`` — strip the fragment and prepend the configured ``prefix``. Drucksachen-Nummer is reconstructed as ``f"{lp}/{number}"`` from the search hit. Full-text search is *not* implemented in this MVP — the backend supports it via ``facet_fulltext = 0`` tags but the public LP-only filter already returns the relevant Antrag pool. ``query`` is applied as a client-side title/Urheber filter. """ # Reverse-engineered facet type constants from bundle.js (pd.facet_*). FACET_FULLTEXT = 0 FACET_FRACTION = 2 FACET_KIND = 7 FACET_TYPE = 8 FACET_LP = 10 def __init__( self, *, bundesland: str, name: str, base_url: str, wahlperiode: int, prefix: str = "/parldok", document_typ: str = "Antrag", document_typ_substring: bool = False, kinds: Optional[list[str]] = None, ) -> None: """Configure a ParlDok 8.x adapter for one specific parliament. Args: bundesland: state code, e.g. ``"MV"``. name: human-readable label. base_url: ``https://...`` host root, no trailing slash. wahlperiode: current legislative period — fed into the ``facet_lp`` tag of the search payload. prefix: app prefix where ParlDok lives. ``/parldok`` for MV. document_typ: client-side filter on the ``type`` field of each hit ("Antrag", "Gesetzentwurf", …). Set to empty string to disable type filtering. document_typ_substring: if True, ``document_typ`` is matched as a substring against the hit's ``type`` field instead of an exact match. Needed for instances where the Drucksachen-Anträge live under composite type strings like ``"Antrag gemäß § 79 GO"`` (Thüringen) — strict ``"Antrag"`` would never match. kinds: optional list of acceptable ``kind`` values. Defaults to ``["Drucksache"]`` if None — but TH packs its Anträge under ``kind="Vorlage"`` so the parameter has to be widened there. """ self.bundesland = bundesland self.name = name self.base_url = base_url.rstrip("/") self.prefix = "/" + prefix.strip("/") self.wahlperiode = wahlperiode self.document_typ = document_typ self.document_typ_substring = document_typ_substring self.kinds = kinds if kinds is not None else ["Drucksache"] def _hit_matches_filters(self, hit: dict) -> bool: """Apply the kind/typ filters to a raw hit dict. Centralised so the search loop can short-circuit cleanly. ``hit`` comes from ``Fulltext/Search`` or ``Fulltext/Resultpage`` JSON responses; both share the same record schema. """ if self.kinds and hit.get("kind") not in self.kinds: return False hit_type = (hit.get("type") or "").strip() if self.document_typ: if self.document_typ_substring: if self.document_typ not in hit_type: return False else: if hit_type != self.document_typ: return False return True @staticmethod def _datum_de_to_iso(datum_de: str) -> str: """DD.MM.YYYY → YYYY-MM-DD; '' for empty input.""" if not datum_de: return "" try: d, m, y = datum_de.split(".") return f"{y}-{m.zfill(2)}-{d.zfill(2)}" except ValueError: return "" def _normalize_fraktion(self, authorhtml: str) -> list[str]: """Thin shim — siehe ``app.parteien.extract_fraktionen``. #55.""" from .parteien import extract_fraktionen return extract_fraktionen(authorhtml, bundesland=self.bundesland) @staticmethod def _fulltext_id(term: str) -> str: """Sanitize a search term to ParlDok's facet ID format. Mirrors ``pd.getFulltextId`` from ``bundle.js``: replace every non-alphanumeric character with ``-``. The server uses this to deduplicate identical search facets. """ return re.sub(r"[^a-zA-Z0-9]", "-", term) def _build_search_body(self, *, length: int = 100, query: str = "") -> dict: """Build the JSON payload for the initial ``Fulltext/Search`` call. Filters by Wahlperiode only — type/kind/fulltext filtering all happen client-side after the hit list is paginated. The ``query`` parameter is accepted for API compatibility but is currently NOT forwarded to the server (#18: einheitliche client-side Title-Suche, kein Server-Volltext, weil das Verhalten zwischen Adaptern sonst asymmetrisch wird). The ``FACET_FULLTEXT`` constant and :meth:`_fulltext_id` helper are kept around as documentation for the previous #12 server-side variant — when fulltext gets uniformly re-introduced later, the dormant tag is just:: {"type": self.FACET_FULLTEXT, "id": self._fulltext_id(query), "fulltext": query, "label": query, "field": "Alle"} Pagination beyond the first page goes through ``Fulltext/Resultpage`` — the ``Search`` endpoint itself ignores any non-zero ``Start``. """ del query # explicitly unused — see docstring tags: list[dict] = [{"type": self.FACET_LP, "id": self.wahlperiode}] return { "devicekey": "", "max": length, "withfilter": False, # sort=2 → newest first (date desc); sort=1 is relevance. "sort": 2, "topk": length, "llm": 0, "newdocsearch": False, "limit": {"Start": 0, "Length": length}, "tags": tags, "updateFilters": [], } def _hit_to_drucksache(self, hit: dict) -> Optional[Drucksache]: """Convert one ParlDok JSON hit to a Drucksache. None if unusable. ParlDok markiert frische Vorlagen mit leerem ``link``/``prelink`` wenn das PDF noch nicht freigegeben ist (z.B. TH 8/1594, datum 2026-03-31, ``allowed: false``). Solche Hits sind für unsere Pipeline wertlos — `download_text` würde an `not doc.link` scheitern und das Frontend würde einen unklickbaren Eintrag anzeigen. Sauberer Skip an dieser Stelle. Issue #61, Bug 1. """ lp = hit.get("lp") number = hit.get("number") if not lp or not number: return None link_field = hit.get("link") or hit.get("prelink") or "" if not link_field: return None # Strip "#navpanes=0" fragment and prepend the prefix. path = link_field.split("#", 1)[0] pdf_url = f"{self.base_url}{self.prefix}{path}" return Drucksache( drucksache=f"{lp}/{number}", title=hit.get("title", ""), fraktionen=self._normalize_fraktion(hit.get("authorhtml", "")), datum=self._datum_de_to_iso(hit.get("date", "")), link=pdf_url, bundesland=self.bundesland, typ=hit.get("type", "") or hit.get("kind", ""), ) async def _post_json( self, client: httpx.AsyncClient, endpoint: str, payload: dict, ) -> Optional[dict]: """POST a JSON-stringified payload to a ParlDok endpoint. ``endpoint`` is the path tail (e.g. ``"Fulltext/Search"`` or ``"Fulltext/Resultpage"``). Returns the inner JSON object (already parsed from the stringified ``data`` field), or None on error. """ homepage = f"{self.base_url}{self.prefix}/" url = f"{self.base_url}{self.prefix}/{endpoint}" try: resp = await client.post( url, data={"data": json.dumps(payload, ensure_ascii=False)}, headers={ "X-Requested-With": "XMLHttpRequest", "Referer": homepage, }, ) if resp.status_code != 200: logger.error( "%s %s HTTP %s", self.bundesland, endpoint, resp.status_code, ) return None outer = resp.json() if not outer.get("success"): logger.error( "%s %s not successful: %s", self.bundesland, endpoint, outer.get("message"), ) return None return json.loads(outer["data"]) except Exception: logger.exception("%s ParlDok %s error", self.bundesland, endpoint) return None async def _initial_search( self, client: httpx.AsyncClient, *, length: int, ) -> tuple[Optional[int], list[dict]]: """Run the initial ``Fulltext/Search`` and return ``(queryid, docs)``. The ``queryid`` is needed for subsequent ``Fulltext/Resultpage`` calls. ParlDok ignores any non-zero ``Start`` on this endpoint — the first 100 hits are the only ones reachable via ``Search``. """ body = self._build_search_body(length=length) inner = await self._post_json(client, "Fulltext/Search", body) if not inner: return None, [] return inner.get("queryid"), (inner.get("docs") or []) async def _result_page( self, client: httpx.AsyncClient, *, queryid: int, start: int, length: int, ) -> list[dict]: """Fetch a further result page via ``Fulltext/Resultpage``.""" payload = { "devicekey": "", "queryid": queryid, "limit": {"Start": start, "Length": length}, } inner = await self._post_json(client, "Fulltext/Resultpage", payload) if not inner: return [] return inner.get("docs") or [] def _make_client(self) -> httpx.AsyncClient: return httpx.AsyncClient( timeout=30, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) async def _paginated_hits(self, client: httpx.AsyncClient): """Async iterator over Drucksachen-style hits across pages. Yields raw hit dicts in newest-first order. The first batch comes from ``Fulltext/Search``, subsequent batches from ``Fulltext/Resultpage`` using the queryid the server returned for the initial call. Stops when a page comes back empty, undersized, or after :attr:`MAX_PAGES` iterations. """ queryid, hits = await self._initial_search(client, length=self.PAGE_SIZE) for hit in hits: yield hit if not queryid or len(hits) < self.PAGE_SIZE: return for page in range(1, self.MAX_PAGES): page_hits = await self._result_page( client, queryid=queryid, start=page * self.PAGE_SIZE, length=self.PAGE_SIZE, ) if not page_hits: return for hit in page_hits: yield hit if len(page_hits) < self.PAGE_SIZE: return # ParlDok 8.x caps Length per request at 100 — paginate if needed. PAGE_SIZE = 100 # Safety bound: scan at most 10 pages × 100 = 1000 most recent docs. # Anträge are ~3% of all hits in MV, so 1000 raw → ~30 Anträge, more # than enough for the typical UI request (limit 5..20). Filtered # queries that find nothing in the last 1000 docs return empty # rather than scan the entire WP — same trade-off as the BE/LSA # PortalaAdapter quick-win window. MAX_PAGES = 10 async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search the configured Wahlperiode, sorted newest-first. #18: einheitliches Verhalten — Server filtert nur nach WP, der Client paginiert über die ganze WP und filtert lokal nach Treffern in Titel oder Urheber. Volltext-Filter (#12) ist zurückgebaut, weil das Verhalten zwischen Adaptern sonst asymmetrisch wird. Sortierung kommt vom Server (newest-first durch ``sort=2`` in :meth:`_build_search_body`). Dedupe per ``lp/number`` weil ParlDok dieselbe Drucksache mehrfach in verschiedenen Vorgängen/Beratungen liefert. """ results: list[Drucksache] = [] seen: set[str] = set() query_terms = [t.lower() for t in query.split() if t] if query else [] async with self._make_client() as client: await client.get(f"{self.base_url}{self.prefix}/") async for hit in self._paginated_hits(client): if not self._hit_matches_filters(hit): continue doc = self._hit_to_drucksache(hit) if not doc: continue if doc.drucksache in seen: continue seen.add(doc.drucksache) if query_terms: hay = f"{doc.title} {hit.get('authorhtml', '')}".lower() if not all(t in hay for t in query_terms): continue results.append(doc) if len(results) >= limit: return results return results async def get_document(self, drucksache: str) -> Optional[Drucksache]: """Look up a single Antrag by ``lp/number`` ID. Pragmatic MVP: page through the WP unfiltered until we find a match. ParlDok offers a ``facet_number`` (14) facet that would let us target the lookup directly, but the facet ID values are instance-specific (would require a ``Fulltext/Filter`` discovery call) and the WP-wide pagination is fast enough for the typical 2k–10k Drucksachen per period. """ wanted_lp, wanted_num = (drucksache.split("/", 1) + [""])[:2] if not wanted_num: return None async with self._make_client() as client: await client.get(f"{self.base_url}{self.prefix}/") async for hit in self._paginated_hits(client): # Don't apply doc-type filters here — get_document is # used to look up arbitrary Drucksachen, including ones # whose kind/typ doesn't match the search-time filter. if str(hit.get("lp")) == wanted_lp and str(hit.get("number")) == wanted_num: return self._hit_to_drucksache(hit) return None async def download_text(self, drucksache: str) -> Optional[str]: """Download the PDF for a Drucksache and extract its text.""" import fitz # PyMuPDF doc = await self.get_document(drucksache) if not doc or not doc.link: return None async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: resp = await client.get(doc.link) if resp.status_code != 200: logger.error( "%s PDF HTTP %s for %s (%s)", self.bundesland, resp.status_code, drucksache, doc.link, ) return None pdf = fitz.open(stream=resp.content, filetype="pdf") text = "" for page in pdf: text += page.get_text() pdf.close() return text except Exception: logger.exception("%s ParlDok download error for %s", self.bundesland, drucksache) return None class StarFinderCGIAdapter(ParlamentAdapter): """Adapter for old-school CGI Starfinder instances. Currently used by Schleswig-Holstein on ``lissh.lvn.parlanet.de/cgi-bin/starfinder/0`` — the **oldest** of the parliament backends we touch. Predates StarWeb's HTML form-submit machinery: instead of submitting a stateful AdvancedSearch form (which BB/HE/NI/RP/HB do), Starfinder accepts the entire query as URL parameters and returns plain HTML with a flat ```` table of records. Reverse-engineering quelle: ``dokukratie/sh.yml`` plus a probe against the live endpoint. Format details: - URL template: ``{base}/cgi-bin/starfinder/0?path={db_path}&id=FASTLINK &pass=&search={starfinder_query}&format=WEBKURZFL`` - Query syntax: ``WP=20+AND+dtyp=antrag`` (URL-encoded). The ``dtyp`` codes are lowercase short labels (``antrag``, ``kleine``). - Encoding: ``iso-8859-1`` (Latin-1) — NOT UTF-8. The HTTP response doesn't always declare it via Content-Type, so we explicitly decode with ``latin1`` to avoid mojibake on the German umlauts. - Hit-format: each record is one ```` with the title in ````, then ``Antrag Drucksache XX/YYYY``. """ _RE_RECORD = re.compile( r'.*?', re.DOTALL, ) _RE_TITLE = re.compile(r"(.*?)", re.DOTALL) _RE_DRUCKSACHE_LINK = re.compile( r']*>(\d+/\d+)' ) # The line between title and the -link looks like: # "Antrag Christian Dirschauer (SSW) 07.04.2026 Drucksache " # We pull the originator(s) and the date out of it. _RE_URHEBER_DATUM = re.compile( r"\s*
\s*[A-Za-zÄÖÜäöüß]+\s+(.+?)\s+(\d{1,2}\.\d{1,2}\.\d{4})\s+Drucksache", re.DOTALL, ) def __init__( self, *, bundesland: str, name: str, base_url: str, wahlperiode: int, db_path: str = "lisshfl.txt", document_typ_code: str = "antrag", ) -> None: self.bundesland = bundesland self.name = name self.base_url = base_url.rstrip("/") self.wahlperiode = wahlperiode self.db_path = db_path self.document_typ_code = document_typ_code @staticmethod def _datum_de_to_iso(datum_de: str) -> str: if not datum_de: return "" try: d, m, y = datum_de.split(".") return f"{y}-{m.zfill(2)}-{d.zfill(2)}" except ValueError: return "" def _normalize_fraktion(self, text: str) -> list[str]: """Thin shim — siehe ``app.parteien.extract_fraktionen``. #55. SH-spezifisch: SSW gehört zur SH-Tabelle und wird durch ``bundesland=SH`` korrekt mit-extrahiert. """ from .parteien import extract_fraktionen return extract_fraktionen(text, bundesland=self.bundesland) def _build_url(self) -> str: """Build the Starfinder URL for the structural WP+dtyp browse. Free-text filtering is done client-side on the parsed records (consistent with #18 — alle Adapter machen einheitlich Title- Filter ohne Server-Volltext, weil das Verhalten zwischen Adaptern sonst asymmetrisch wird). """ search_param = f"WP={self.wahlperiode}+AND+dtyp={self.document_typ_code}" return ( f"{self.base_url}/cgi-bin/starfinder/0" f"?path={self.db_path}&id=FASTLINK&pass=&search={search_param}" f"&format=WEBKURZFL" ) def _parse_records(self, html: str) -> list[Drucksache]: results: list[Drucksache] = [] for record_html in self._RE_RECORD.findall(html): m_link = self._RE_DRUCKSACHE_LINK.search(record_html) if not m_link: continue pdf_url, drucksache = m_link.group(1), m_link.group(2) m_title = self._RE_TITLE.search(record_html) title = re.sub(r"\s+", " ", m_title.group(1)).strip() if m_title else f"Drucksache {drucksache}" urheber = "" datum_iso = "" m_meta = self._RE_URHEBER_DATUM.search(record_html) if m_meta: urheber = m_meta.group(1).strip() datum_iso = self._datum_de_to_iso(m_meta.group(2)) results.append(Drucksache( drucksache=drucksache, title=title, fraktionen=self._normalize_fraktion(urheber), datum=datum_iso, link=pdf_url, bundesland=self.bundesland, typ="Antrag", )) return results async def search(self, query: str, limit: int = 20) -> list[Drucksache]: url = self._build_url() async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: resp = await client.get(url) if resp.status_code != 200: logger.error("%s search HTTP %s", self.bundesland, resp.status_code) return [] # Force latin1 because the Starfinder server doesn't always # advertise the encoding correctly. html = resp.content.decode("latin-1", errors="replace") results = self._parse_records(html) except Exception: logger.exception("%s search error", self.bundesland) return [] # Client-side title + Urheber filter (siehe #18) if query: terms = [t.lower() for t in query.split() if t] results = [ d for d in results if all(t in f"{d.title} {' '.join(d.fraktionen)}".lower() for t in terms) ] return results[:limit] async def get_document(self, drucksache: str) -> Optional[Drucksache]: """Look up a single Drucksache by ID. SH responses are pre-sorted newest-first; we re-fetch up to 200 records and scan for the exact match. The Starfinder server doesn't expose a number-only filter that we know of. """ results = await self.search(query="", limit=200) for doc in results: if doc.drucksache == drucksache: return doc return None async def download_text(self, drucksache: str) -> Optional[str]: import fitz # PyMuPDF doc = await self.get_document(drucksache) if not doc or not doc.link: return None async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: resp = await client.get(doc.link) if resp.status_code != 200: return None pdf = fitz.open(stream=resp.content, filetype="pdf") text = "" for page in pdf: text += page.get_text() pdf.close() return text except Exception: logger.exception("%s PDF download error for %s", self.bundesland, drucksache) return None class BayernAdapter(ParlamentAdapter): """Adapter for Bayerischer Landtag.""" bundesland = "BY" name = "Bayerischer Landtag" base_url = "https://www.bayern.landtag.de" async def search(self, query: str, limit: int = 20) -> list[Drucksache]: # TODO: Implement Bayern search return [] async def get_document(self, drucksache: str) -> Optional[Drucksache]: # TODO: Implement return None async def download_text(self, drucksache: str) -> Optional[str]: return None class PARLISAdapter(ParlamentAdapter): """Adapter for Baden-Württemberg's PARLIS — eUI/portala-Variante mit polling und JSON-in-HTML-Comment-Records. PARLIS auf ``parlis.landtag-bw.de`` läuft technisch auf demselben eUI-Backend wie LSA-PADOKA und BE-PARDOK, aber mit drei wichtigen Unterschieden, die eine eigene Klasse statt einer PortalaAdapter- Subklasse rechtfertigen: 1. **Body-Schema:** Statt der portala/LSA-typischen ``search.lines`` mit ``2/3/4/10/11/20.x/90.x``-Slots nutzt PARLIS ein viel kürzeres ``l1/l2/l3/l4`` Schema (siehe ``dokukratie/scrapers/portala.query.bw.json``). ``serverrecordname`` ist ``"vorgang"`` statt ``"sr_generic1"``, ``format`` ist ``"suchergebnis-vorgang-full"``, ``sort`` ist ``"SORT01/D SORT02/D SORT03"``. Es gibt kein ``parsed`` und kein ``json``-Tree — der Server akzeptiert das minimale Schema direkt. 2. **Async polling:** Im Gegensatz zu LSA/BE liefert die initiale ``Fulltext/Search``-Antwort nur eine ``search_id`` mit ``status: "running"``, KEINE ``report_id``. Erst eine zweite ``SearchAndDisplay``-Anfrage mit ``id: `` (und ohne ``search``-Component) bekommt die fertige ``report_id`` zurück. In meinen Live-Tests reichte ein einziger 2-Sekunden-Sleep zwischen den Calls. 3. **Hit-Format:** Die ``report.tt.html``-Antwort liefert keine Perl-Dump-Blöcke (LSA) und keine Bootstrap-Card-Divs (BE), sondern **JSON-Records in HTML-Kommentaren**:: Der Parser zieht die Comments raw raus und mappt die WMV/EWBV- Felder auf das ``Drucksache``-Dataclass. Reverse-Engineering-Quelle: ``dokukratie/scrapers/portala.query.bw.json`` + Live-HAR gegen ``parlis.landtag-bw.de`` (Issue #29). """ # Reverse-engineered field map for the JSON records that come embedded # in HTML comments inside report.tt.html responses. # # Records look like ```` and may contain # nested ``...`` highlight tags inside the JSON values. # Non-greedy match against the literal closing ``}-->`` because that # delimiter does not appear inside the JSON payload itself. _RE_RECORD = re.compile(r"", re.DOTALL) _RE_DRUCKSACHE = re.compile(r"Drucksache\s+(\d+/\d+)") _RE_DATUM = re.compile(r"(\d{1,2}\.\d{1,2}\.\d{4})") def __init__( self, *, bundesland: str, name: str, base_url: str, wahlperiode: int, prefix: str = "/parlis", document_typ: str = "Antrag", date_window_days: int = 730, poll_attempts: int = 15, poll_interval_seconds: float = 2.0, ) -> None: """Configure a PARLIS adapter for one specific parliament instance. Args: bundesland: state code, e.g. ``"BW"``. name: human-readable label. base_url: ``https://parlis.landtag-bw.de`` (no trailing slash). wahlperiode: legislative period — feeds into ``lines.l1``. prefix: app prefix where PARLIS lives. ``/parlis`` for BW. document_typ: feeds into ``lines.l4``. The server interprets this as a German document type label like ``"Antrag"``. date_window_days: look-back window for the search range, quick-win against title-only filtering — same approach as the PortalaAdapter for LSA/BE. poll_attempts: how many times to poll for ``report_id`` before giving up. ~15 × 2s = 30s upper bound. poll_interval_seconds: sleep between poll attempts. """ self.bundesland = bundesland self.name = name self.base_url = base_url.rstrip("/") self.prefix = "/" + prefix.strip("/") self.wahlperiode = wahlperiode self.document_typ = document_typ self.date_window_days = date_window_days self.poll_attempts = poll_attempts self.poll_interval_seconds = poll_interval_seconds @staticmethod def _datum_de_to_iso(datum_de: str) -> str: """DD.MM.YYYY → YYYY-MM-DD; '' for empty input.""" if not datum_de: return "" try: d, m, y = datum_de.split(".") return f"{y}-{m.zfill(2)}-{d.zfill(2)}" except ValueError: return "" def _normalize_fraktion(self, text: str) -> list[str]: """Thin shim — siehe ``app.parteien.extract_fraktionen``. #55. PARLIS packt den Originator in ``EWBV23`` wie ``"Antrag Felix Herkens (GRÜNE), Saskia Frank (GRÜNE)..."``. """ from .parteien import extract_fraktionen return extract_fraktionen(text, bundesland=self.bundesland) def _build_initial_body(self, start_date: str, end_date: str) -> dict: """Build the first ``SearchAndDisplay`` body with the search component. The schema follows ``dokukratie/scrapers/portala.query.bw.json`` verbatim — only the placeholder values are substituted. """ return { "action": "SearchAndDisplay", "report": { "rhl": "main", "rhlmode": "add", "format": "suchergebnis-vorgang-full", "mime": "html", "sort": "SORT01/D SORT02/D SORT03", }, "search": { "lines": { "l1": str(self.wahlperiode), "l2": start_date, "l3": end_date, "l4": self.document_typ, }, "serverrecordname": "vorgang", }, "sources": ["Star"], } def _build_poll_body(self, search_id: str) -> dict: """Build the polling body — same action, but with the search_id instead of a fresh search component.""" return { "action": "SearchAndDisplay", "report": { "rhl": "main", "rhlmode": "add", "format": "suchergebnis-vorgang-full", "mime": "html", "sort": "SORT01/D SORT02/D SORT03", }, "id": search_id, "sources": ["Star"], } def _hit_record_to_drucksache(self, record: dict) -> Optional[Drucksache]: """Map a single JSON-in-comment record to a ``Drucksache``. PARLIS-record schema (reverse-engineered, all values are arrays of ``{"main": ...}`` dicts): - ``EWBV22``: "Drucksache 17/10323" - ``EWBD05``: direct PDF URL - ``EWBV23``: "Antrag " — single combined line - ``WMV30``: short Urheber summary ("Felix Herkens (GRÜNE) u. a.") - ``WMV33``: subject keywords (Schlagworte) - ``EWBD01``: "Drucksache " """ def first(field: str) -> str: block = record.get(field) if isinstance(block, list) and block: return (block[0].get("main") or "").strip() return "" ds_text = first("EWBV22") or first("EWBD01") m_ds = self._RE_DRUCKSACHE.search(ds_text) if not m_ds: return None drucksache = m_ds.group(1) # The "title" we want is the Schlagworte/topic, not the # Drucksachen-Header. PARLIS keeps the human-readable subject # in WMV33 (Schlagworte joined by semicolons) — that's the # closest equivalent to "title" the LSA/BE adapters expose. # Fallback to the EWBV23 line if WMV33 is empty. schlagworte = first("WMV33") # Strip embedded ... highlight tags schlagworte_clean = re.sub(r"", "", schlagworte).strip() title = schlagworte_clean or first("EWBV23") or f"Drucksache {drucksache}" # Date + Urheber out of EWBV23 ("Antrag ") ewbv23 = first("EWBV23") m_dat = self._RE_DATUM.search(ewbv23) datum_iso = self._datum_de_to_iso(m_dat.group(1) if m_dat else "") urheber_short = first("WMV30") fraktionen = self._normalize_fraktion(urheber_short or ewbv23) pdf_url = first("EWBD05") return Drucksache( drucksache=drucksache, title=title, fraktionen=fraktionen, datum=datum_iso, link=pdf_url, bundesland=self.bundesland, typ=self.document_typ, ) async def _initial_search_and_poll( self, client: httpx.AsyncClient, start_date: str, end_date: str, ) -> Optional[str]: """Run the initial search + poll until ``report_id`` arrives.""" import asyncio browse_html = f"{self.base_url}{self.prefix}/browse.tt.html" browse_json = f"{self.base_url}{self.prefix}/browse.tt.json" # Step 1: warm cookies await client.get(browse_html) # Step 2: initial search try: resp = await client.post( browse_json, json=self._build_initial_body(start_date, end_date), headers={"Referer": browse_html}, ) except Exception: logger.exception("%s initial search request error", self.bundesland) return None if resp.status_code != 200: logger.error("%s initial search HTTP %s", self.bundesland, resp.status_code) return None data = resp.json() if data.get("report_id"): return data["report_id"] search_id = data.get("search_id") if not search_id: logger.error("%s no search_id in initial response: %s", self.bundesland, data) return None # Step 3: poll until report_id appears or we run out of attempts for _ in range(self.poll_attempts): await asyncio.sleep(self.poll_interval_seconds) try: resp = await client.post( browse_json, json=self._build_poll_body(search_id), headers={"Referer": browse_html}, ) except Exception: logger.exception("%s poll request error", self.bundesland) return None if resp.status_code != 200: logger.error("%s poll HTTP %s", self.bundesland, resp.status_code) return None data = resp.json() if data.get("report_id"): return data["report_id"] star = data.get("sources", {}).get("Star", {}) if star.get("status") == "stopped" and not data.get("report_id"): # Search finished but no report — empty result return None logger.warning("%s gave up polling after %d attempts", self.bundesland, self.poll_attempts) return None def _parse_report_html(self, html: str) -> list[Drucksache]: """Extract Drucksachen from a report.tt.html response. Records are JSON objects embedded in HTML comments. We pull each comment block via regex, parse it as JSON, and map the WMV/EWBV fields to a Drucksache. """ results: list[Drucksache] = [] for m in self._RE_RECORD.finditer(html): json_text = m.group(1) try: record = json.loads(json_text) except json.JSONDecodeError: continue doc = self._hit_record_to_drucksache(record) if doc: results.append(doc) return results async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search recent BW Anträge with optional client-side title filter. Server-side full-text is not used (#18 — einheitliches Verhalten ohne Volltext bis alle Adapter es können). The client filter looks at title (Schlagworte) + Urheber. """ from datetime import date, timedelta end = date.today() start = end - timedelta(days=self.date_window_days) async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: report_id = await self._initial_search_and_poll( client, start.isoformat(), end.isoformat(), ) if not report_id: return [] # Pull a generous chunk so the client-side filter has # enough material to work with. chunksize = max(limit * 10, 200) if query else max(limit * 2, 50) report_url = ( f"{self.base_url}{self.prefix}/report.tt.html" f"?report_id={report_id}&start=0&chunksize={chunksize}" ) resp = await client.get( report_url, headers={"Referer": f"{self.base_url}{self.prefix}/browse.tt.html"}, ) if resp.status_code != 200: logger.error("%s report HTTP %s", self.bundesland, resp.status_code) return [] results = self._parse_report_html(resp.text) except Exception: logger.exception("%s search error", self.bundesland) return [] # Client-side filter if query: terms = [t.lower() for t in query.split() if t] results = [ d for d in results if all(t in f"{d.title} {' '.join(d.fraktionen)}".lower() for t in terms) ] return results[:limit] async def get_document(self, drucksache: str) -> Optional[Drucksache]: """Look up a single Drucksache by ID via a broad browse.""" results = await self.search(query="", limit=200) for doc in results: if doc.drucksache == drucksache: return doc return None async def download_text(self, drucksache: str) -> Optional[str]: """Download the PDF for a Drucksache and extract its text.""" import fitz # PyMuPDF doc = await self.get_document(drucksache) if not doc or not doc.link: return None async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: resp = await client.get(doc.link) if resp.status_code != 200: logger.error( "%s PDF HTTP %s for %s (%s)", self.bundesland, resp.status_code, drucksache, doc.link, ) return None pdf = fitz.open(stream=resp.content, filetype="pdf") text = "" for page in pdf: text += page.get_text() pdf.close() return text except Exception: logger.exception("%s PDF download error for %s", self.bundesland, drucksache) return None class PARiSHBAdapter(ParlamentAdapter): """Bremen-Adapter für PARiS (paris.bremische-buergerschaft.de). PARiS ist die alte Java-Servlet-Variante von StarWeb (anders als HE/starweb.hessen.de, das auf dem moderneren eUI läuft). Die Suche geht über genau einen POST-Call gegen ``/starweb/paris/servlet.starweb`` mit form-urlencoded Body. Response ist ein vollständiges HTML- Ergebnis-Page mit ````-Hits. Hit-Format pro ````: - ``S`` oder ``L`` als Indikator - ``

TITEL

`` - Stichworte (Thesaurus-Links, ignoriert) - ``Drs 21/730 S`` (Drucksachen-Nr mit S/L-Suffix) - ``Änderungsantrag vom 23.02.2026`` (Typ + Datum) - ``SPD, BÜNDNIS 90/DIE GRÜNEN, Die Linke`` (Fraktionen) - ```` Bremen hat zwei parallele Parlamente: Bürgerschaft (Landtag) für landespolitische Anträge und Stadtbürgerschaft für Bremens kommunale Sachen. Wir lassen beide durch (``PARL=S OR L``) — der Stadtbürgerschafts-Anteil ist für die GWÖ-Bilanzierung sogar interessanter, weil viele Entscheidungen auf kommunaler Ebene laufen. """ bundesland = "HB" name = "Bremische Bürgerschaft (PARiS)" base_url = "https://paris.bremische-buergerschaft.de" servlet_path = "/starweb/paris/servlet.starweb" wahlperiode = 21 # Pro-Hit-Regex über das ``-Pattern _RE_TR = re.compile( r']*>([\s\S]*?)', re.IGNORECASE, ) _RE_TITLE = re.compile(r']*>\s*]*>(.*?)', re.DOTALL) _RE_DRUCKSACHE = re.compile(r'Drs\s*\s*(\d+/\d+)\s*([SL]?)\s*') _RE_TYP_DATUM = re.compile(r'\s*,\s*([^,<\n]+?)\s+vom\s+(\d{1,2}\.\d{1,2}\.\d{4})') _RE_FRAKTIONEN_AFTER_DATUM = re.compile(r'vom\s+\d{1,2}\.\d{1,2}\.\d{4}\s*\s*([^<]+)') _RE_PDF_LINK = re.compile( r']*target="new"', re.IGNORECASE, ) def _normalize_fraktion(self, text: str) -> list[str]: from .parteien import extract_fraktionen return extract_fraktionen(text, bundesland=self.bundesland) @staticmethod def _datum_de_to_iso(datum_de: str) -> str: try: d, m, y = datum_de.split(".") return f"{y}-{m.zfill(2)}-{d.zfill(2)}" except ValueError: return "" @staticmethod def _strip_html(s: str) -> str: """Entferne HTML-Tags und entities aus einem Snippet.""" s = re.sub(r"<[^>]+>", "", s) s = s.replace("–", "–").replace(" ", " ") s = re.sub(r"&[a-zA-Z]+;", " ", s) return re.sub(r"\s+", " ", s).strip() def _parse_record_html(self, chunk: str) -> Optional[Drucksache]: m_ds = self._RE_DRUCKSACHE.search(chunk) if not m_ds: return None nr_only = m_ds.group(1) # "21/730" suffix = m_ds.group(2) or "" # "S" oder "L" # Drucksachen-ID: ohne Whitespace, mit Suffix dahinter wenn vorhanden drucksache = f"{nr_only}{suffix}" if suffix else nr_only m_t = self._RE_TITLE.search(chunk) title = self._strip_html(m_t.group(1)) if m_t else f"Drucksache {drucksache}" m_pdf = self._RE_PDF_LINK.search(chunk) pdf_url = m_pdf.group(1) if m_pdf else "" m_td = self._RE_TYP_DATUM.search(chunk) if m_td: typ = self._strip_html(m_td.group(1)) datum = self._datum_de_to_iso(m_td.group(2)) else: typ = "Drucksache" datum = "" m_fr = self._RE_FRAKTIONEN_AFTER_DATUM.search(chunk) urheber = self._strip_html(m_fr.group(1)) if m_fr else "" fraktionen = self._normalize_fraktion(urheber) return Drucksache( drucksache=drucksache, title=title, fraktionen=fraktionen, datum=datum, link=pdf_url, bundesland=self.bundesland, typ=typ, ) def _build_form_body(self, query: str) -> dict: """Form-Body für PARiS Suche. - ``path=paris/LISSHFL.web``: die LISSH-Vorgangsdatenbank - ``format=LISSH_BrowseVorgang_Report``: Browse-Format mit allen Hits in einer Page (kein Pagination) - ``01_LISSHFL_Themen``: Thesaurus-Volltext-Suche. Der Server akzeptiert kein ``*``-Wildcard und timeout-t bei leerem Wert, deshalb verwenden wir bei leerer Query ein hochfrequentes Stoppwort als Catch-all. - ``02_LISSHFL_PARL=S OR L``: Stadtbürgerschaft + Landtag - ``03_LISSHFL_WP``: aktuelle Wahlperiode (kein Range — ein Multi-WP-Range hat im Test 60s+ gebraucht) """ return { "path": "paris/LISSHFL.web", "format": "LISSH_BrowseVorgang_Report", "01_LISSHFL_Themen": query or "der", # häufiges Stoppwort "02_LISSHFL_PARL": "S OR L", "03_LISSHFL_WP": str(self.wahlperiode), } async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Single-POST-Search gegen den PARiS-Servlet.""" body = self._build_form_body(query) async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: resp = await client.post( f"{self.base_url}{self.servlet_path}", data=body, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) if resp.status_code != 200: logger.error("HB PARiS HTTP %s", resp.status_code) return [] results: list[Drucksache] = [] for chunk in self._RE_TR.findall(resp.text): doc = self._parse_record_html(chunk) if not doc: continue if "antrag" not in (doc.typ or "").lower(): continue results.append(doc) if len(results) >= limit: break return results except Exception: logger.exception("HB PARiS search error") return [] async def get_document(self, drucksache: str) -> Optional[Drucksache]: """Linearer Lookup über die search()-Resultate.""" # Bei Drucksachen-IDs mit Suffix (21/730S) zerlegen wir die, # damit die Volltext-Suche den nackten Drucksachen-Anteil findet m = re.match(r"(\d+/\d+)([SL]?)$", drucksache) if not m: return None results = await self.search("*", limit=200) for d in results: if d.drucksache == drucksache: return d return None async def download_text(self, drucksache: str) -> Optional[str]: import fitz doc = await self.get_document(drucksache) if not doc or not doc.link: return None async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: resp = await client.get(doc.link) if resp.status_code != 200: return None pdf = fitz.open(stream=resp.content, filetype="pdf") text = "" for page in pdf: text += page.get_text() pdf.close() return text except Exception: logger.exception("HB PARiS PDF download error for %s", drucksache) return None class StarWebHEAdapter(ParlamentAdapter): """Hessen-spezifischer eUI-Adapter (#24/#30). starweb.hessen.de läuft auf einem eUI-Backend mit synchronem 2-Step- Flow (anders als BW PARLIS, das asynchron pollt): 1. POST ``/portal/browse.tt.json`` mit ``action=SearchAndDisplay`` → Response enthält ``report_id`` direkt 2. GET ``/portal/report.tt.html?report_id=...`` → HTML mit den Hits Hit-Format: Cards mit ``efxRecordRepeater``-divs, Daten in HTML- Kommentar-Perl-Dumps (````). Field-Mapping: - ``WEV01`` → Title - ``WEV02`` → Datum - ``WEV03`` → Typ - ``WEV07`` → PDF-URL - ``WEV08`` → Drucksachen-Nummer - ``WEV12`` → Urheber/Fraktion Source: ``hlt.lis`` (Hessischer Landtag), Wahlperiode 21. """ _RE_HE_COMMENT_DUMP = re.compile( r'', re.DOTALL, ) _RE_HE_WEV01 = re.compile(r"'WEV01'\s*=>\s*\[\s*\{\s*'main'\s*=>\s*[\"']([^\"']+)[\"']") _RE_HE_WEV02 = re.compile(r"'WEV02'\s*=>\s*\[\s*\{\s*'main'\s*=>\s*[\"'](\d{1,2}\.\d{1,2}\.\d{4})[\"']") _RE_HE_WEV03 = re.compile(r"'WEV03'\s*=>\s*\[\s*\{\s*'main'\s*=>\s*[\"']([^\"']+)[\"']") _RE_HE_WEV07 = re.compile(r"'WEV07'\s*=>\s*\[\s*\{\s*'main'\s*=>\s*[\"']([^\"']+)[\"']") _RE_HE_WEV08 = re.compile(r"'WEV08'\s*=>\s*\[\s*\{\s*'main'\s*=>\s*[\"'](\d+/\d+)[\"']") _RE_HE_WEV12 = re.compile(r"'WEV12'\s*=>\s*\[\s*\{\s*'main'\s*=>\s*[\"']([^\"']+)[\"']") bundesland = "HE" name = "Hessischer Landtag (StarWeb)" base_url = "https://starweb.hessen.de" portal_path = "/portal" wahlperiode = 21 def _normalize_fraktion(self, text: str) -> list[str]: from .parteien import extract_fraktionen return extract_fraktionen(text, bundesland=self.bundesland) @staticmethod def _datum_de_to_iso(datum_de: str) -> str: if not datum_de: return "" try: d, m, y = datum_de.split(".") return f"{y}-{m.zfill(2)}-{d.zfill(2)}" except ValueError: return "" @staticmethod def _decode_perl_hex(text: str) -> str: """Wandle ``\\x{e9}`` → ``é`` etc. um. Robuste Hex-Substitution.""" return re.sub( r"\\x\{([0-9a-fA-F]+)\}", lambda m: chr(int(m.group(1), 16)), text, ) def _build_initial_body(self, query: str = "") -> dict: """HE-Server-Body. Aktuelle WP, optional Volltext-Filter. Der Server verlangt ZWINGEND einen ``search.json``-Term-Tree mit einer ``not(query, NOWEB=X)``-Wurzel. ``parsed``/``sref`` allein reichen nicht — der Server ignoriert sie und liefert nur ``facets`` zurück. """ wp_str = str(self.wahlperiode) wp_term = { "tn": "term", "t": wp_str, "sf": "WP", "op": "eq", "idx": 45, "l": 3, "num": 1, } # Bauen den Top-NOT-Tree: NOT(query_subtree, NOWEB=X) if query: vtdrs_term = { "tn": "term", "t": f"\"(/VT ('\\\"{query}\\\"'))\"", "sf": "VTDRS", "op": "eq", "idx": 9, "l": 3, "num": 3, } inner = {"tn": "and", "terms": [vtdrs_term, wp_term], "num": 4} parsed = ( f"((/VTDRS \"(/VT ('\\\"{query}\\\"'))\") " f"AND (/WP {wp_str})) AND NOT NOWEB=X" ) else: inner = wp_term parsed = f"(/WP {wp_str}) AND NOT NOWEB=X" json_tree = [{ "tn": "not", "terms": [ inner, {"tn": "term", "t": "X", "sf": "NOWEB", "op": "eq", "idx": 100, "l": 3, "num": 2}, ], }] return { "action": "SearchAndDisplay", "sources": ["hlt.lis"], "report": { "rhl": "main", "rhlmode": "add", "format": "generic2-short", "mime": "html", "sort": "WPSORT/D DRSORT/D", }, "search": { "lines": {"1": query, "2": wp_str}, "serverrecordname": "generic2Search", "parsed": parsed, "sref": parsed, "json": json_tree, }, } async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Synchroner 2-Step gegen starweb.hessen.de.""" from .parteien import extract_fraktionen body = self._build_initial_body(query) browse_url = f"{self.base_url}{self.portal_path}/browse.tt.json" report_url = f"{self.base_url}{self.portal_path}/report.tt.html" async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: resp = await client.post(browse_url, json=body) if resp.status_code != 200: logger.error("HE browse HTTP %s", resp.status_code) return [] data = resp.json() report_id = data.get("report_id") if not report_id: logger.error("HE: no report_id in browse response keys=%s", sorted(data.keys())) return [] # Step 2: report.tt.html mit chunksize — ohne den Parameter # liefert der Server nur den allerersten Hit (8 KB HTML). # Wir nehmen 1500 als Floor, analog #61 PortalaAdapter, weil # nach dem client-side Antrag-Filter die Hit-Dichte gering # ist (HE hat ~1:30 Antrag/Anfrage). chunksize = max(limit * 30, 1500) rep = await client.get( report_url, params={ "report_id": report_id, "start": 0, "chunksize": chunksize, }, ) if rep.status_code != 200: logger.error("HE report HTTP %s", rep.status_code) return [] results = self._parse_report_html(rep.text) # Client-side Antrag-Filter (analog #61 Bug 2/3 für portala) results = [d for d in results if "antrag" in (d.typ or "").lower()] # Optional Query-Filter client-side if query: qterms = query.lower().split() results = [ d for d in results if all(t in (d.title.lower() + " " + " ".join(d.fraktionen).lower()) for t in qterms) ] return results[:limit] except Exception: logger.exception("HE search error") return [] def _parse_report_html(self, html: str) -> list[Drucksache]: """Zieht Daten aus den ````- Kommentaren. WEV01–WEV12 → Drucksache-Felder.""" from .parteien import extract_fraktionen results: list[Drucksache] = [] for dump in self._RE_HE_COMMENT_DUMP.findall(html): m_ds = self._RE_HE_WEV08.search(dump) if not m_ds: continue drucksache = m_ds.group(1) m_t = self._RE_HE_WEV01.search(dump) title = self._decode_perl_hex(m_t.group(1)) if m_t else f"Drucksache {drucksache}" m_pdf = self._RE_HE_WEV07.search(dump) pdf_url = m_pdf.group(1) if m_pdf else "" if pdf_url.startswith("http://"): pdf_url = "https://" + pdf_url[len("http://"):] m_dat = self._RE_HE_WEV02.search(dump) datum_iso = self._datum_de_to_iso(m_dat.group(1)) if m_dat else "" m_typ = self._RE_HE_WEV03.search(dump) typ = self._decode_perl_hex(m_typ.group(1)) if m_typ else "Drucksache" m_urheber = self._RE_HE_WEV12.search(dump) urheber = self._decode_perl_hex(m_urheber.group(1)) if m_urheber else "" fraktionen = extract_fraktionen(urheber, bundesland=self.bundesland) results.append(Drucksache( drucksache=drucksache, title=title, fraktionen=fraktionen, datum=datum_iso, link=pdf_url, bundesland=self.bundesland, typ=typ, )) return results async def get_document(self, drucksache: str) -> Optional[Drucksache]: """Linearer Lookup über search() — wie die anderen Adapter, kein Direkt-ID-Filter.""" results = await self.search("", limit=200) for d in results: if d.drucksache == drucksache: return d return None async def download_text(self, drucksache: str) -> Optional[str]: import fitz doc = await self.get_document(drucksache) if not doc or not doc.link: return None async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: resp = await client.get(doc.link) if resp.status_code != 200: return None pdf = fitz.open(stream=resp.content, filetype="pdf") text = "" for page in pdf: text += page.get_text() pdf.close() return text except Exception: logger.exception("HE PDF download error for %s", drucksache) return None class BundestagAdapter(ParlamentAdapter): """Adapter für den Deutschen Bundestag via DIP-API. Quelle: ``search.dip.bundestag.de/api/v1`` — die offizielle REST-API des Dokumentations- und Informationssystems (DIP). Schema dokumentiert unter https://dip.bundestag.de/über-dip/hilfe/api (SPA, Inhalt im Bundle ``main.*.chunk.js``). Auth via URL-Parameter ``apikey=...`` PLUS einem ``Origin: https://dip.bundestag.de``-Header — der Server macht Origin-Locking auf seine eigene Single-Page-App. Der API-Key liegt offen in ``dip-config.js`` und wird vom DIP-Frontend bei jedem Request als URL-Parameter mitgeschickt. Solange wir den Origin-Header setzen, akzeptiert die API das auch von server-to- server-Calls. Doc-Mapping (``/api/v1/drucksache``): - ``dokumentnummer`` → ``drucksache`` (z.B. ``"21/5136"``) - ``titel`` → ``title`` - ``urheber[*].bezeichnung``/``titel`` → ``fraktionen`` (durch ``parteien.extract_fraktionen`` normalisiert, deckt ``"Fraktion der AfD"`` → ``"AfD"`` ab) - ``datum`` → ``datum`` (bereits ISO YYYY-MM-DD) - ``fundstelle.pdf_url`` → ``link`` - ``drucksachetyp`` → ``typ`` (Filter auf ``"Antrag"``) Pagination via ``cursor``-Parameter — der Server gibt nach jedem Result einen neuen Cursor zurück, den wir als nächsten Request mitschicken. 100 Hits pro Page, pro Wahlperiode ~600 Anträge. """ bundesland = "BUND" name = "Deutscher Bundestag (DIP)" base_url = "https://search.dip.bundestag.de/api/v1" # Aus dip-config.js gescraped (öffentlich, klartext, von der DIP-SPA # bei jedem Request mitgesendet). Origin-Locking macht den Key # nicht-trivial weiterzugeben, aber für server-to-server-Calls mit # gesetztem Origin-Header voll funktional. DEFAULT_APIKEY = "SbGXhWA.3cpnNdb8rkht7iWpvSgTP8XIG88LoCrGd4" ORIGIN = "https://dip.bundestag.de" def __init__( self, *, apikey: Optional[str] = None, wahlperiode: int = 21, document_typ: str = "Antrag", ): self.apikey = apikey or self.DEFAULT_APIKEY self.wahlperiode = wahlperiode self.document_typ = document_typ def _make_client(self) -> httpx.AsyncClient: return httpx.AsyncClient( timeout=30, follow_redirects=True, headers={ "Origin": self.ORIGIN, "Referer": f"{self.ORIGIN}/", "User-Agent": "Mozilla/5.0 GWOE-Antragspruefer", "Accept": "application/json", }, ) def _doc_to_drucksache(self, doc: dict) -> Optional[Drucksache]: """Map ein DIP-/drucksache-JSON auf unser ``Drucksache``-dataclass. ``None`` wenn essentielle Felder fehlen.""" from .parteien import extract_fraktionen nummer = doc.get("dokumentnummer") if not nummer: return None # PDF-URL aus fundstelle ziehen — ist die zuverlässige Adresse fundstelle = doc.get("fundstelle") or {} pdf_url = fundstelle.get("pdf_url") or "" if not pdf_url: return None # Fraktionen aus urheber-Liste extrahieren. DIP listet sie als # "Fraktion der AfD" o.ä. — extract_fraktionen kennt das Pattern # bereits aus den Landtags-Adaptern. urheber_strs: list[str] = [] for u in (doc.get("urheber") or []): if isinstance(u, dict): urheber_strs.append(u.get("titel") or u.get("bezeichnung") or "") urheber_combined = ", ".join(filter(None, urheber_strs)) fraktionen = extract_fraktionen(urheber_combined, bundesland=self.bundesland) return Drucksache( drucksache=nummer, title=doc.get("titel", ""), fraktionen=fraktionen, datum=doc.get("datum", ""), link=pdf_url, bundesland=self.bundesland, typ=doc.get("drucksachetyp", "Antrag"), ) async def _fetch_page( self, client: httpx.AsyncClient, *, cursor: Optional[str] = None, ) -> tuple[list[dict], Optional[str]]: """Lade eine Page vom /drucksache-Endpoint. Returns (docs, next_cursor).""" params = { "apikey": self.apikey, "f.drucksachetyp": self.document_typ, "f.wahlperiode": str(self.wahlperiode), } if cursor: params["cursor"] = cursor try: resp = await client.get(f"{self.base_url}/drucksache", params=params) if resp.status_code != 200: logger.error("BUND DIP HTTP %s: %s", resp.status_code, resp.text[:200]) return [], None data = resp.json() return data.get("documents", []), data.get("cursor") except Exception: logger.exception("BUND DIP request error") return [], None async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Liste die neuesten Anträge der konfigurierten Wahlperiode. Server liefert Antrags-gefiltert + nach Aktualität sortiert; wir paginieren über cursor bis ``limit`` (oder das Ende der Periode) erreicht ist. Query wird client-side als Title-Substring-Filter angewandt — die DIP-API hat einen ``f.titel``-Filter, aber für Konsistenz mit den Landtags-Adaptern (alle nutzen client-side Filter wegen Schema-Drift) machen wir es hier auch so. """ results: list[Drucksache] = [] seen: set[str] = set() query_terms = [t.lower() for t in query.split() if t] if query else [] async with self._make_client() as client: cursor: Optional[str] = None for _ in range(20): # max 20 pages = 2000 docs als Hard-Cap docs, next_cursor = await self._fetch_page(client, cursor=cursor) if not docs: break for raw in docs: doc = self._doc_to_drucksache(raw) if not doc: continue if doc.drucksache in seen: continue seen.add(doc.drucksache) if query_terms: hay = doc.title.lower() if not all(t in hay for t in query_terms): continue results.append(doc) if len(results) >= limit: return results # Cursor unverändert → letzte Page erreicht if not next_cursor or next_cursor == cursor: break cursor = next_cursor return results async def get_document(self, drucksache: str) -> Optional[Drucksache]: """Look up a single Drucksache by ID. Nutzt den f.dokumentnummer- Filter — direkter Treffer ohne Pagination.""" async with self._make_client() as client: try: resp = await client.get( f"{self.base_url}/drucksache", params={ "apikey": self.apikey, "f.dokumentnummer": drucksache, "f.wahlperiode": str(self.wahlperiode), }, ) if resp.status_code != 200: return None docs = resp.json().get("documents", []) for raw in docs: if raw.get("dokumentnummer") == drucksache: return self._doc_to_drucksache(raw) except Exception: logger.exception("BUND get_document error for %s", drucksache) return None async def download_text(self, drucksache: str) -> Optional[str]: """Download das Drucksachen-PDF und extrahiere Volltext.""" import fitz doc = await self.get_document(drucksache) if not doc or not doc.link: return None async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: resp = await client.get(doc.link) if resp.status_code != 200: return None pdf = fitz.open(stream=resp.content, filetype="pdf") text = "" for page in pdf: text += page.get_text() pdf.close() return text except Exception: logger.exception("BUND download error for %s", drucksache) return None # Registry of adapters ADAPTERS = { "BUND": BundestagAdapter(), "HB": PARiSHBAdapter(), "HE": StarWebHEAdapter(), "NRW": NRWAdapter(), "LSA": PortalaAdapter( bundesland="LSA", name="Landtag von Sachsen-Anhalt (PADOKA)", base_url="https://padoka.landtag.sachsen-anhalt.de", db_id="lsa.lissh", wahlperiode=8, portala_path="/portal", document_type="Antrag", pdf_url_prefix="/files/", ), "BE": PortalaAdapter( bundesland="BE", name="Abgeordnetenhaus von Berlin (PARDOK)", base_url="https://pardok.parlament-berlin.de", db_id="lah.lissh", wahlperiode=19, portala_path="/portala", # Berlin's ETYPF index uses different value strings — drop the # document_type subtree, fall back to client-side title filter. document_type=None, # Quick-win for #13: pulled the date window from the original # 180-day MVP up to 730 days so client-side title-filter searches # ("Schule" etc.) reach back across more of the WP19 corpus until # the eUI fulltext-sf is reverse-engineered. The chunksize bump # in PortalaAdapter.search() means the per-request payload stays # bounded. date_window_days=730, pdf_url_prefix="/files/", ), "MV": ParLDokAdapter( bundesland="MV", name="Landtag Mecklenburg-Vorpommern (ParlDok)", base_url="https://www.dokumentation.landtag-mv.de", wahlperiode=8, prefix="/parldok", document_typ="Antrag", ), "HH": ParLDokAdapter( bundesland="HH", name="Hamburgische Bürgerschaft (ParlDok)", base_url="https://www.buergerschaft-hh.de", wahlperiode=23, prefix="/parldok", document_typ="Antrag", ), "TH": ParLDokAdapter( bundesland="TH", name="Thüringer Landtag (ParlDok)", base_url="https://parldok.thueringer-landtag.de", wahlperiode=8, prefix="/parldok", # TH packs Anträge under composite type strings like # "Antrag gemäß § 79 GO" with kind="Vorlage", not the # MV-style kind="Drucksache"/type="Antrag". Substring-match # on "Antrag" plus widened kind list catches them all. document_typ="Antrag", document_typ_substring=True, kinds=["Drucksache", "Vorlage"], ), "SH": StarFinderCGIAdapter( bundesland="SH", name="Schleswig-Holsteinischer Landtag (LIS-SH)", base_url="http://lissh.lvn.parlanet.de", wahlperiode=20, db_path="lisshfl.txt", document_typ_code="antrag", ), "BB": PortalaAdapter( bundesland="BB", name="Landtag Brandenburg (parladoku)", base_url="https://www.parlamentsdokumentation.brandenburg.de", db_id="lbb.lissh", wahlperiode=8, portala_path="/portal", document_type="Antrag", # BB packs the date BEFORE the Drucksachen-Nummer in the h6 # line and uses the BE-style efxRecordRepeater HTML cards; # the auto-detect picks the card path automatically. ), "RP": PortalaAdapter( bundesland="RP", name="Landtag Rheinland-Pfalz (OPAL)", base_url="https://opal.rlp.de", db_id="rlp.lissh", wahlperiode=18, portala_path="/portal", document_type="Antrag", ), "BY": BayernAdapter(), "BW": PARLISAdapter( bundesland="BW", name="Landtag von Baden-Württemberg (PARLIS)", base_url="https://parlis.landtag-bw.de", wahlperiode=17, prefix="/parlis", document_typ="Antrag", ), } def get_adapter(bundesland: str) -> Optional[ParlamentAdapter]: """Get adapter for a bundesland.""" return ADAPTERS.get(bundesland) async def search_all(query: str, bundesland: str = "NRW", limit: int = 20) -> list[Drucksache]: """Search parliament documents in a specific state.""" adapter = get_adapter(bundesland) if not adapter: return [] return await adapter.search(query, limit)