"""Parliament search adapters for different German states.""" import json import logging import httpx import re from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Optional from bs4 import BeautifulSoup logger = logging.getLogger(__name__) @dataclass class Drucksache: """A parliamentary document.""" drucksache: str # e.g. "18/8125" title: str fraktionen: list[str] datum: str # ISO date link: str # PDF URL bundesland: str typ: str = "Antrag" # Antrag, Anfrage, Beschlussempfehlung, etc. class ParlamentAdapter(ABC): """Base adapter for searching parliament documents.""" bundesland: str name: str @abstractmethod async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search for documents matching query.""" pass @abstractmethod async def get_document(self, drucksache: str) -> Optional[Drucksache]: """Get a specific document by ID.""" pass @abstractmethod async def download_text(self, drucksache: str) -> Optional[str]: """Download and extract text from a document.""" pass class NRWAdapter(ParlamentAdapter): """Adapter for NRW Landtag (opal.landtag.nrw.de).""" bundesland = "NRW" name = "Landtag Nordrhein-Westfalen" base_url = "https://opal.landtag.nrw.de" search_url = "https://opal.landtag.nrw.de/home/dokumente/dokumentensuche/parlamentsdokumente/aktuelle-dokumente.html" def _parse_query(self, query: str) -> tuple[str, list[str], bool]: """ Parse search query for AND logic and exact phrases. Returns: (search_term_for_api, filter_terms, is_exact) Examples: - 'Klimaschutz Energie' -> ('Klimaschutz', ['klimaschutz', 'energie'], False) - '"Grüner Stahl"' -> ('Grüner Stahl', ['grüner stahl'], True) - 'Klimaschutz "erneuerbare Energie"' -> ('Klimaschutz', ['klimaschutz', 'erneuerbare energie'], False) """ query = query.strip() # Check for exact phrase (entire query in quotes) if query.startswith('"') and query.endswith('"') and query.count('"') == 2: exact = query[1:-1].strip() return (exact, [exact.lower()], True) # Extract quoted phrases and regular terms import shlex try: parts = shlex.split(query) except ValueError: # Fallback for unbalanced quotes parts = query.split() if not parts: return (query, [query.lower()], False) # Use first term for API search, all terms for filtering filter_terms = [p.lower() for p in parts] return (parts[0], filter_terms, False) def _matches_all_terms(self, doc: 'Drucksache', terms: list[str], is_exact: bool) -> bool: """Check if document matches all search terms (AND logic).""" searchable = f"{doc.title} {doc.drucksache} {' '.join(doc.fraktionen)} {doc.typ}".lower() if is_exact: # Exact phrase must appear return terms[0] in searchable else: # All terms must appear (AND) return all(term in searchable for term in terms) async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search NRW Landtag documents via OPAL portal.""" results = [] # Parse query for AND logic api_query, filter_terms, is_exact = self._parse_query(query) async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: try: # First, get the page to establish session initial = await client.get(self.search_url) if initial.status_code != 200: print(f"NRW search initial request failed: {initial.status_code}") return [] # Parse for webflow token from pagination links soup = BeautifulSoup(initial.text, 'html.parser') # Find a pagination link to extract the webflow token pagination_link = soup.select_one('a[href*="webflowexecution"]') webflow_token = "" webflow_execution = "" if pagination_link: href = pagination_link.get('href', '') # Extract webflowToken and webflowexecution from URL token_match = re.search(r'webflowToken=([^&]*)', href) exec_match = re.search(r'(webflowexecution[^=]+)=([^&]+)', href) if token_match: webflow_token = token_match.group(1) if exec_match: webflow_execution = f"{exec_match.group(1)}={exec_match.group(2)}" # Now perform the search with POST # Find the form action URL with webflow token form = soup.select_one('form#docSearchByItem') form_action = self.search_url if form and form.get('action'): action = form.get('action') if action.startswith('/'): form_action = f"{self.base_url}{action}" elif action.startswith('http'): form_action = action else: form_action = f"{self.search_url}?{action}" # Build form data for "Einfache Suche" (searchByItem form) form_data = { '_eventId_sendform': '1', 'dokNum': api_query, # This is the text search field 'formId': 'searchByItem', 'dokTyp': '', # All types 'wp': '18', # Wahlperiode 18 } # POST request with form data to the form action URL search_resp = await client.post( form_action, data=form_data, cookies=initial.cookies, headers={'Content-Type': 'application/x-www-form-urlencoded'} ) if search_resp.status_code != 200: print(f"NRW search request failed: {search_resp.status_code}") return [] # Parse results soup = BeautifulSoup(search_resp.text, 'html.parser') # Find all document result items (li elements containing articles) items = soup.select('li:has(article)') for item in items[:limit]: try: # Extract drucksache number from first link num_link = item.select_one('a[href*="MMD"]') if not num_link: continue href = num_link.get('href', '') # Extract number: MMD18-12345.pdf -> 18/12345 match = re.search(r'MMD(\d+)-(\d+)\.pdf', href) if not match: continue legislatur, nummer = match.groups() drucksache = f"{legislatur}/{nummer}" pdf_url = f"https://www.landtag.nrw.de{href}" if href.startswith('/') else href # Extract title from the title link (class e-document-result-item__title) title_elem = item.select_one('a.e-document-result-item__title') if title_elem: # Get text content, clean it up title = title_elem.get_text(strip=True) # Remove SVG icon text and clean title = re.sub(r'\s* Optional[Drucksache]: """Get document metadata by drucksache ID (e.g. '18/8125').""" # Parse legislatur and number match = re.match(r"(\d+)/(\d+)", drucksache) if not match: return None legislatur, nummer = match.groups() pdf_url = f"https://www.landtag.nrw.de/portal/WWW/dokumentenarchiv/Dokument/MMD{legislatur}-{nummer}.pdf" # Try to fetch and extract basic info async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: try: resp = await client.head(pdf_url) if resp.status_code == 200: return Drucksache( drucksache=drucksache, title=f"Drucksache {drucksache}", fraktionen=[], datum="", link=pdf_url, bundesland="NRW", ) except: pass return None async def download_text(self, drucksache: str) -> Optional[str]: """Download PDF and extract text.""" import fitz # PyMuPDF doc = await self.get_document(drucksache) if not doc: return None async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client: try: resp = await client.get(doc.link) if resp.status_code != 200: return None # Extract text with PyMuPDF pdf = fitz.open(stream=resp.content, filetype="pdf") text = "" for page in pdf: text += page.get_text() pdf.close() return text except Exception as e: print(f"Error downloading {drucksache}: {e}") return None class PortalaAdapter(ParlamentAdapter): """Adapter for portala/eUI-based parliament documentation systems. Used by parliaments running the proprietary "esearch" / portala framework (originally developed for STAR/StarFinder backends, now wrapped in a Single-Page App with Template Toolkit on the server side): - **LSA** (Sachsen-Anhalt) — PADOKA at ``padoka.landtag.sachsen-anhalt.de`` under ``/portal/`` (singular) - **BE** (Berlin) — PARDOK at ``pardok.parlament-berlin.de`` under ``/portala/`` (with the trailing 'a') Both instances share the same JSON action schema, only the base URL, the data source ID, the application path prefix and a few minor quirks differ — those are constructor parameters so that the same class can serve both states (and any future portala-based parliament). The search workflow is two-stage: 1. ``POST {base}{path}/browse.tt.json`` with a complex JSON ``action`` body that contains an Elasticsearch-style query tree under ``search.json``. The server returns a ``report_id`` plus hit count. 2. ``POST {base}{path}/report.tt.html`` with ``{report_id, start, chunksize}`` to fetch the HTML hit list. Each hit carries a Perl Data::Dumper block in a ``
`` tag with the canonical metadata.

    The query body schema was reverse-engineered from
    https://github.com/okfde/dokukratie/blob/main/dokukratie/scrapers/portala.query.json
    (GPL-3.0 — only structure/selectors are reused, not Python code).

    Full-text search is **not** implemented in the MVP: the adapter
    returns documents of the current Wahlperiode in the given date
    window, and the search query is applied as a client-side
    title/Urheber filter. The server-side full-text path requires
    state-specific ``sf`` index names that are not yet known.
    """

    def __init__(
        self,
        *,
        bundesland: str,
        name: str,
        base_url: str,
        db_id: str,
        wahlperiode: int,
        portala_path: str = "/portal",
        document_type: Optional[str] = "Antrag",
        pdf_url_prefix: str = "/files/",
        date_window_days: int = 730,
    ) -> None:
        """Configure a portala/eUI adapter for one specific parliament.

        Args:
            bundesland: state code (e.g. ``"LSA"``, ``"BE"``).
            name: human-readable adapter label (used in logs/UI).
            base_url: ``https://...`` of the portal host without trailing slash.
            db_id: data source identifier the eUI server expects in
                ``action.sources``, e.g. ``"lsa.lissh"`` or ``"lah.lissh"``.
            wahlperiode: current legislative period — fed into the WP
                term of the search tree.
            portala_path: path prefix where the portala app lives. ``/portal``
                for LSA, ``/portala`` for Berlin.
            document_type: optional filter applied via ETYPF/DTYPF/DART
                terms. ``"Antrag"`` works for LSA; for instances where
                the index uses different document_type values (e.g. Berlin),
                pass ``None`` to drop the document_type subtree entirely
                — the user can still filter client-side by title.
            pdf_url_prefix: URL fragment between ``base_url`` and the
                relative PDF path returned by the server.
            date_window_days: how many days back ``search()`` looks by
                default.
        """
        self.bundesland = bundesland
        self.name = name
        self.base_url = base_url.rstrip("/")
        self.db_id = db_id
        self.wahlperiode = wahlperiode
        self.portala_path = "/" + portala_path.strip("/")
        self.document_type = document_type
        self.pdf_url_prefix = "/" + pdf_url_prefix.strip("/") + "/"
        self.date_window_days = date_window_days

    # ── LSA-style hit list (Perl Data::Dumper inside 
 blocks) ──
    # Reverse-engineered "WEV*" record fields:
    # WEV06.main = title
    # WEV32.5    = relative PDF path
    # WEV32.main = "Antrag   Drucksache X/YYYY ..."
    _RE_TITLE = re.compile(r"'WEV06'\s*=>\s*\[\s*\{\s*'main'\s*=>\s*[\"']([^\"']+)[\"']")
    _RE_PDF = re.compile(r"'5'\s*=>\s*'([^']*\.pdf)'")
    _RE_DRUCKSACHE = re.compile(r"Drucksache\s*(\d+/\d+)")
    _RE_URHEBER_DATUM = re.compile(
        r"'WEV32'\s*=>\s*\[\s*\{[^}]*'main'\s*=>\s*[\"']Antrag\s+(.+?)\s+(\d{1,2}\.\d{1,2}\.\d{4})\s+Drucksache",
    )
    _RE_PRE_BLOCK = re.compile(r'
\$VAR1 = (.*?)
', re.DOTALL) # ── Berlin-style hit list (production HTML cards, no Perl dump) ── # The whole div for one record: _RE_BE_RECORD = re.compile( r']*class="[^"]*efxRecordRepeater[^"]*"[^>]*data-efx-rec="[^"]*"[^>]*>(.*?)(?=]*efxRecordRepeater|]*id="efxResultsEnd"||$)', re.DOTALL, ) _RE_BE_TITLE = re.compile(r']*class="h5[^"]*"[^>]*>\s*([^<]+)') _RE_BE_LINK = re.compile(r']*href="([^"]+\.pdf)"[^>]*>') # The metadata h6 looks like: # Antrag (Eilantrag)  Drucksache 19/3104 S. 1 bis 24 vom 31.03.2026 _RE_BE_DRUCKSACHE = re.compile(r'Drucksache\s+(\d+/\d+)') _RE_BE_DATUM = re.compile(r'vom\s+(\d{1,2}\.\d{1,2}\.\d{4})') _RE_BE_DOCTYPE = re.compile(r'\s*([^<&]+?)(?: |<)') @staticmethod def _decode_perl_hex(s: str) -> str: """Decode \\x{abcd} escape sequences from Perl Data::Dumper output.""" return re.sub(r'\\x\{([0-9a-f]+)\}', lambda m: chr(int(m.group(1), 16)), s) @staticmethod def _normalize_fraktion(urheber: str) -> list[str]: """Map Urheber-String to canonical fraction codes. Uses regex word boundaries instead of plain substring matching so that comma-separated lists ("CDU, SPD") and the embedded "DIE LINKE" are matched reliably. """ u = urheber.upper() out: list[str] = [] def has(pattern: str) -> bool: return re.search(pattern, u) is not None if has(r"\bBÜNDNIS\s*90\b") or has(r"\bGR(?:Ü|UE)NE\b"): out.append("GRÜNE") if has(r"\bCDU\b"): out.append("CDU") if has(r"\bSPD\b"): out.append("SPD") if has(r"\bFDP\b"): out.append("FDP") if has(r"\bAFD\b"): out.append("AfD") if has(r"\bLINKE\b"): out.append("LINKE") if has(r"\bBSW\b"): out.append("BSW") if has(r"LANDESREGIERUNG|SENAT VON BERLIN|REGIERENDE[RN]?\s+BÜRGERMEISTER|MINISTER\b|STAATSKANZLEI"): out.append("Landesregierung") return out def _build_search_body( self, wahlperiode: int, start_date: str, end_date: str, ) -> dict: """Build the action JSON body for browse.tt.json. The schema is taken from dokukratie's portala.query.json template and only differs in the data source and the variable substitutions. When ``self.document_type`` is None, the ETYPF/DTYPF/DART subtree is dropped — useful for parliaments whose ETYPF index uses different value strings than ``"Antrag"``. """ document_type = self.document_type date_range_text = f"{start_date} THRU {end_date}" date_term = lambda sf, num: { # noqa: E731 — local helper "tn": "trange", "sf": sf, "op": "eq", "num": num, "idx": 119, "l": 3, "p1": start_date, "t1": start_date, "p2": end_date, "t2": end_date, "t": date_range_text, } # Build the search.lines (form-state mirror) and the json tree lines: dict = { "2": str(wahlperiode), "10": start_date, "11": end_date, "20.1": "alWEBBI", "20.2": "alWEBBI", "20.3": "alWEBBI", "90.1": "AND", "90.2": "AND", "90.3": "AND", } if document_type is not None: lines["3"] = document_type lines["4"] = "D" # Top-level AND tree top_terms: list = [ {"tn": "term", "t": str(wahlperiode), "idx": 6, "l": 3, "sf": "WP", "op": "eq", "num": 5}, ] if document_type is not None: top_terms.append({"tn": "or", "num": 3, "terms": [ {"tn": "or", "num": 4, "terms": [ {"tn": "term", "t": f'"{document_type}"', "idx": 50, "l": 4, "sf": "ETYPF", "op": "eq", "num": 10}, {"tn": "term", "t": f'"{document_type}"', "idx": 50, "l": 4, "sf": "ETYP2F", "op": "eq", "num": 11}, {"tn": "term", "t": f'"{document_type}"', "idx": 50, "l": 4, "sf": "DTYPF", "op": "eq", "num": 12}, {"tn": "term", "t": f'"{document_type}"', "idx": 50, "l": 4, "sf": "DTYP2F", "op": "eq", "num": 13}, {"tn": "term", "t": f'"{document_type}"', "idx": 50, "l": 4, "sf": "1VTYPF", "op": "eq", "num": 14}, ]}, {"tn": "or", "num": 15, "terms": [ {"tn": "term", "t": '"D"', "idx": 93, "l": 4, "sf": "DART", "op": "eq", "num": 16}, {"tn": "term", "t": '"D"', "idx": 93, "l": 4, "sf": "DARTS", "op": "eq", "num": 17}, ]}, ]}) top_terms.append({"tn": "or", "num": 18, "terms": [ {"tn": "or", "num": 19, "terms": [ date_term("DAT", 20), date_term("DDAT", 21), ]}, date_term("SDAT", 22), ]}) top_terms.append({"tn": "term", "t": "DOKDBE", "idx": 156, "l": 1, "sf": "TYP", "op": "eq", "num": 23}) # Mirror the same shape into the parsed/sref display strings if document_type is not None: parsed = ( f"((/WP {wahlperiode}) AND " f"(/ETYPF,ETYP2F,DTYPF,DTYP2F,1VTYPF (\"{document_type}\")) " f"AND (/DART,DARTS (\"D\")) AND " f"(DAT,DDAT,SDAT= {date_range_text})) AND TYP=DOKDBE" ) else: parsed = ( f"((/WP {wahlperiode}) AND " f"(DAT,DDAT,SDAT= {date_range_text})) AND TYP=DOKDBE" ) return { "action": "SearchAndDisplay", "sources": [self.db_id], "report": { "rhl": "main", "rhlmode": "add", "format": "generic1-full", "mime": "html", "sort": "WEVSO1/D WEVSO2 WEVSO3", }, "search": { "lines": lines, "serverrecordname": "sr_generic1", "parsed": parsed, "sref": parsed, "json": [{ "tn": "and", "num": 1, "terms": top_terms, }], }, "dataSet": "1", } @staticmethod def _datum_de_to_iso(datum_de: str) -> str: """Convert DD.MM.YYYY → YYYY-MM-DD; return '' for empty input.""" if not datum_de: return "" d, m, y = datum_de.split(".") return f"{y}-{m.zfill(2)}-{d.zfill(2)}" def _parse_hit_list_html(self, html: str, query_filter: str = "") -> list[Drucksache]: """Extract Drucksachen from a report.tt.html response. Two formats are supported and auto-detected: - **LSA-style:** the records are embedded as Perl Data::Dumper dumps inside ``
$VAR1 = …
`` blocks. WEV06 → title, WEV32 → metadata + PDF path. Used by Sachsen-Anhalt's PADOKA template. - **Berlin-style:** standard production HTML cards with ``efxRecordRepeater`` divs. Title in an ``

``, metadata + PDF link in an ````. Used by Berlin's PARDOK template. """ if self._RE_PRE_BLOCK.search(html): return self._parse_hit_list_dump(html, query_filter) return self._parse_hit_list_cards(html, query_filter) def _parse_hit_list_dump(self, html: str, query_filter: str) -> list[Drucksache]: """Parse LSA-style ``
$VAR1 = …
`` Perl-dump records.""" results: list[Drucksache] = [] for pre in self._RE_PRE_BLOCK.findall(html): m_ds = self._RE_DRUCKSACHE.search(pre) if not m_ds: continue drucksache = m_ds.group(1) m_t = self._RE_TITLE.search(pre) title = self._decode_perl_hex(m_t.group(1)) if m_t else f"Drucksache {drucksache}" m_pdf = self._RE_PDF.search(pre) pdf_rel = m_pdf.group(1) if m_pdf else "" pdf_url = f"{self.base_url}{self.pdf_url_prefix}{pdf_rel}" if pdf_rel else "" m_w32 = self._RE_URHEBER_DATUM.search(pre) urheber = self._decode_perl_hex(m_w32.group(1).strip()) if m_w32 else "" datum_iso = self._datum_de_to_iso(m_w32.group(2) if m_w32 else "") fraktionen = self._normalize_fraktion(urheber) if urheber else [] doc = Drucksache( drucksache=drucksache, title=title, fraktionen=fraktionen, datum=datum_iso, link=pdf_url, bundesland=self.bundesland, typ="Antrag", ) if query_filter: hay = f"{title} {urheber}".lower() if not all(t in hay for t in query_filter.lower().split()): continue results.append(doc) return results def _parse_hit_list_cards(self, html: str, query_filter: str) -> list[Drucksache]: """Parse Berlin-style ``efxRecordRepeater`` HTML-card records. Each card contains an ``

`` title, a metadata ```` with the document type, the Drucksachen-Nummer, and the date, plus a direct ```` link to the PDF on the same host. """ results: list[Drucksache] = [] # Split the HTML on every record-div opener — easier than balancing # divs with regex. chunks = html.split('class="record') # First chunk is the prelude, skip it for chunk in chunks[1:]: # Each chunk now starts at the record class attribute m_t = self._RE_BE_TITLE.search(chunk) title = m_t.group(1).strip() if m_t else "Ohne Titel" m_ds = self._RE_BE_DRUCKSACHE.search(chunk) if not m_ds: continue drucksache = m_ds.group(1) m_pdf = self._RE_BE_LINK.search(chunk) pdf_url = "" if m_pdf: href = m_pdf.group(1) if href.startswith("http://") or href.startswith("https://"): pdf_url = href elif href.startswith("/"): pdf_url = f"{self.base_url}{href}" else: pdf_url = f"{self.base_url}{self.pdf_url_prefix}{href}" m_dat = self._RE_BE_DATUM.search(chunk) datum_iso = self._datum_de_to_iso(m_dat.group(1) if m_dat else "") m_doc = self._RE_BE_DOCTYPE.search(chunk) doctype_full = m_doc.group(1).strip() if m_doc else "Drucksache" # Berlin often packs the originator(s) into the same h6 line: # "Antrag CDU, SPD" → fraktionen = ["CDU","SPD"], typ = "Antrag" # Senat-Vorlagen carry no fraction, only "Vorlage zur …". fraktionen = self._normalize_fraktion(doctype_full) # Strip the fraction names back out of the typ string so the UI # shows a clean "Antrag" / "Vorlage …" label. typ = doctype_full if fraktionen: # Cut at the first occurrence of any party name cuts = [typ.upper().find(f.upper()) for f in fraktionen] cuts = [c for c in cuts if c >= 0] if cuts: typ = typ[: min(cuts)].rstrip(" ,") doc = Drucksache( drucksache=drucksache, title=title, fraktionen=fraktionen, datum=datum_iso, link=pdf_url, bundesland=self.bundesland, typ=typ, ) if query_filter: hay = f"{title} {doctype_full}".lower() if not all(t in hay for t in query_filter.lower().split()): continue results.append(doc) return results async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search recent documents of the current Wahlperiode. ``query`` is applied as a client-side title/Urheber filter; the server-side query covers the configured ``date_window_days`` (default 24 months). """ from datetime import date, timedelta end = date.today() start = end - timedelta(days=self.date_window_days) body = self._build_search_body( wahlperiode=self.wahlperiode, start_date=start.isoformat(), end_date=end.isoformat(), ) browse_html = f"{self.base_url}{self.portala_path}/browse.tt.html" browse_json = f"{self.base_url}{self.portala_path}/browse.tt.json" report_html = f"{self.base_url}{self.portala_path}/report.tt.html" async with httpx.AsyncClient( timeout=30, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: # Step 1: warm up cookies via the browse page await client.get(browse_html) # Step 2: submit the search action resp = await client.post( browse_json, json=body, headers={"Referer": browse_html}, ) if resp.status_code != 200: logger.error("%s search HTTP %s", self.bundesland, resp.status_code) return [] data = resp.json() report_id = data.get("report_id") if not report_id: logger.error("%s: no report_id in response: %s", self.bundesland, data) return [] # Step 3: fetch the HTML hit list # Take a generous chunk so client-side filter still has enough chunksize = 100 if query else limit report_resp = await client.post( report_html, json={"report_id": report_id, "start": 0, "chunksize": chunksize}, headers={"Referer": browse_html}, ) if report_resp.status_code != 200: logger.error("%s report HTTP %s", self.bundesland, report_resp.status_code) return [] results = self._parse_hit_list_html(report_resp.text, query_filter=query) return results[:limit] except Exception: logger.exception("%s search error", self.bundesland) return [] async def get_document(self, drucksache: str) -> Optional[Drucksache]: """Look up a single document by ID via the search endpoint with a document_number filter.""" # Pragmatic MVP: do a broad search and filter for the requested ID. # A targeted single-document fetch would require a different # action.search.json structure that we have not reverse-engineered yet. results = await self.search(query="", limit=200) for doc in results: if doc.drucksache == drucksache: return doc return None async def download_text(self, drucksache: str) -> Optional[str]: """Download the PDF for a Drucksache and extract its text.""" import fitz # PyMuPDF doc = await self.get_document(drucksache) if not doc or not doc.link: return None async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: resp = await client.get(doc.link) if resp.status_code != 200: return None pdf = fitz.open(stream=resp.content, filetype="pdf") text = "" for page in pdf: text += page.get_text() pdf.close() return text except Exception: logger.exception("%s download error for %s", self.bundesland, drucksache) return None class ParLDokAdapter(ParlamentAdapter): """Adapter for ParlDok 8.x parliament documentation systems (J3S GmbH). ParlDok is a proprietary parliament documentation product by J3S GmbH (https://www.j3s.de). Different from the portala/eUI framework used by LSA/BE: ParlDok 8.x is a single-page app whose backend is a JSON API rooted at ``{base_url}{prefix}/Fulltext/...``. The legacy ParLDok 5.x HTML POST form (``parldok/formalkriterien``) used by dokukratie's MV YAML scraper has been deprecated by the LandtagMV upgrade to 8.3.5. Confirmed instances using this engine (April 2026): - **MV** (Mecklenburg-Vorpommern) — ``dokumentation.landtag-mv.de/parldok`` - HH, SN, TH all advertise ParlDok in dokukratie but their actual versions/themes have not been verified yet. Search workflow: 1. ``GET {base_url}{prefix}/`` to obtain the session cookie. The backend rejects POSTs without it. 2. ``POST {base_url}{prefix}/Fulltext/Search`` with form-encoded ``data=`` payload. The JSON carries a ``tags`` array of facet selections; each tag is ``{"type": , "id": }``. Reverse-engineered facet type constants from the bundle.js (``pd.facet_*``): - ``facet_fraction = 2`` - ``facet_kind = 7`` (Drucksache, Plenarprotokoll, …) - ``facet_type = 8`` (Antrag, Gesetzentwurf, Kleine Anfrage, …) - ``facet_lp = 10`` (Wahlperiode) Response is JSON ``{success, data: }`` where the inner ``data`` carries ``{count, docs: [{id, title, date, authorhtml, kind, type, lp, number, link, ...}], ...}``. 3. PDF download: ``GET {base_url}{prefix}/dokument/{numeric_id}``. Returns ``application/pdf`` directly. The ``link`` field returned by the search API already contains the path fragment ``/dokument/#navpanes=0`` — strip the fragment and prepend the configured ``prefix``. Drucksachen-Nummer is reconstructed as ``f"{lp}/{number}"`` from the search hit. Full-text search is *not* implemented in this MVP — the backend supports it via ``facet_fulltext = 0`` tags but the public LP-only filter already returns the relevant Antrag pool. ``query`` is applied as a client-side title/Urheber filter. """ # Reverse-engineered facet type constants from bundle.js (pd.facet_*). FACET_FULLTEXT = 0 FACET_FRACTION = 2 FACET_KIND = 7 FACET_TYPE = 8 FACET_LP = 10 def __init__( self, *, bundesland: str, name: str, base_url: str, wahlperiode: int, prefix: str = "/parldok", document_typ: str = "Antrag", ) -> None: """Configure a ParlDok 8.x adapter for one specific parliament. Args: bundesland: state code, e.g. ``"MV"``. name: human-readable label. base_url: ``https://...`` host root, no trailing slash. wahlperiode: current legislative period — fed into the ``facet_lp`` tag of the search payload. prefix: app prefix where ParlDok lives. ``/parldok`` for MV. document_typ: client-side filter on the ``type`` field of each hit ("Antrag", "Gesetzentwurf", …). Set to empty string to disable type filtering. """ self.bundesland = bundesland self.name = name self.base_url = base_url.rstrip("/") self.prefix = "/" + prefix.strip("/") self.wahlperiode = wahlperiode self.document_typ = document_typ @staticmethod def _datum_de_to_iso(datum_de: str) -> str: """DD.MM.YYYY → YYYY-MM-DD; '' for empty input.""" if not datum_de: return "" try: d, m, y = datum_de.split(".") return f"{y}-{m.zfill(2)}-{d.zfill(2)}" except ValueError: return "" @staticmethod def _normalize_fraktion(authorhtml: str) -> list[str]: """Map ParlDok ``authorhtml`` to canonical fraction codes. ``authorhtml`` may be a comma-separated list of fractions ("CDU, SPD, F.D.P."), a single MdL with party in parens ("Thomas de Jesus Fernandes (AfD)") or empty (Landesregierung). """ if not authorhtml: return [] u = authorhtml.upper() out: list[str] = [] if re.search(r"\bBÜNDNIS\s*90\b", u) or re.search(r"\bGR(?:Ü|UE)NE\b", u): out.append("GRÜNE") if re.search(r"\bCDU\b", u): out.append("CDU") if re.search(r"\bSPD\b", u): out.append("SPD") # F.D.P. (with dots, historical) and FDP both occur in MV if re.search(r"\bF\.?\s*D\.?\s*P\.?\b", u): out.append("FDP") if re.search(r"\bAFD\b", u): out.append("AfD") if re.search(r"\bLINKE\b", u) or re.search(r"\bLL/PDS\b", u): out.append("LINKE") if re.search(r"\bBSW\b", u): out.append("BSW") if re.search(r"LANDESREGIERUNG|MINISTER\b|STAATSKANZLEI|MINISTERPRÄSIDENT", u): out.append("Landesregierung") return out @staticmethod def _fulltext_id(term: str) -> str: """Sanitize a search term to ParlDok's facet ID format. Mirrors ``pd.getFulltextId`` from ``bundle.js``: replace every non-alphanumeric character with ``-``. The server uses this to deduplicate identical search facets. """ return re.sub(r"[^a-zA-Z0-9]", "-", term) def _build_search_body(self, *, length: int = 100, query: str = "") -> dict: """Build the JSON payload for the initial ``Fulltext/Search`` call. Filters by Wahlperiode + optional server-side full-text search. Type/kind filtering still happens client-side because the facet_type/facet_kind value IDs are instance-specific and would require an extra ``Fulltext/Filter`` round trip to discover. Pagination beyond the first page goes through ``Fulltext/Resultpage`` — the ``Search`` endpoint itself ignores any non-zero ``Start``. The full-text tag schema is reverse-engineered from ``pd.addInput`` in ``bundle.js`` and matches the SPA payload verbatim:: {"type": 0, "id": "", "fulltext": "", "label": "", "field": "Alle"} ``field="Alle"`` means "search all indexed fields" (``pd.currentFTSearchMode`` default). The server tokenizes the term and applies AND-semantics across whitespace. """ tags: list[dict] = [{"type": self.FACET_LP, "id": self.wahlperiode}] if query: tags.append({ "type": self.FACET_FULLTEXT, "id": self._fulltext_id(query), "fulltext": query, "label": query, "field": "Alle", }) return { "devicekey": "", "max": length, "withfilter": False, # sort=2 → newest first (date desc); sort=1 is relevance. "sort": 2, "topk": length, "llm": 0, "newdocsearch": False, "limit": {"Start": 0, "Length": length}, "tags": tags, "updateFilters": [], } def _hit_to_drucksache(self, hit: dict) -> Optional[Drucksache]: """Convert one ParlDok JSON hit to a Drucksache. None if unusable.""" lp = hit.get("lp") number = hit.get("number") if not lp or not number: return None link_field = hit.get("link") or hit.get("prelink") or "" # Strip "#navpanes=0" fragment and prepend the prefix. path = link_field.split("#", 1)[0] pdf_url = f"{self.base_url}{self.prefix}{path}" if path else "" return Drucksache( drucksache=f"{lp}/{number}", title=hit.get("title", ""), fraktionen=self._normalize_fraktion(hit.get("authorhtml", "")), datum=self._datum_de_to_iso(hit.get("date", "")), link=pdf_url, bundesland=self.bundesland, typ=hit.get("type", "") or hit.get("kind", ""), ) async def _post_json( self, client: httpx.AsyncClient, endpoint: str, payload: dict, ) -> Optional[dict]: """POST a JSON-stringified payload to a ParlDok endpoint. ``endpoint`` is the path tail (e.g. ``"Fulltext/Search"`` or ``"Fulltext/Resultpage"``). Returns the inner JSON object (already parsed from the stringified ``data`` field), or None on error. """ homepage = f"{self.base_url}{self.prefix}/" url = f"{self.base_url}{self.prefix}/{endpoint}" try: resp = await client.post( url, data={"data": json.dumps(payload, ensure_ascii=False)}, headers={ "X-Requested-With": "XMLHttpRequest", "Referer": homepage, }, ) if resp.status_code != 200: logger.error( "%s %s HTTP %s", self.bundesland, endpoint, resp.status_code, ) return None outer = resp.json() if not outer.get("success"): logger.error( "%s %s not successful: %s", self.bundesland, endpoint, outer.get("message"), ) return None return json.loads(outer["data"]) except Exception: logger.exception("%s ParlDok %s error", self.bundesland, endpoint) return None async def _initial_search( self, client: httpx.AsyncClient, *, length: int, query: str = "", ) -> tuple[Optional[int], list[dict]]: """Run the initial ``Fulltext/Search`` and return ``(queryid, docs)``. The ``queryid`` is needed for subsequent ``Fulltext/Resultpage`` calls. ParlDok ignores any non-zero ``Start`` on this endpoint — the first 100 hits are the only ones reachable via ``Search``. ``query`` is sent server-side as a ``facet_fulltext`` tag — see ``_build_search_body``. """ body = self._build_search_body(length=length, query=query) inner = await self._post_json(client, "Fulltext/Search", body) if not inner: return None, [] return inner.get("queryid"), (inner.get("docs") or []) async def _result_page( self, client: httpx.AsyncClient, *, queryid: int, start: int, length: int, ) -> list[dict]: """Fetch a further result page via ``Fulltext/Resultpage``.""" payload = { "devicekey": "", "queryid": queryid, "limit": {"Start": start, "Length": length}, } inner = await self._post_json(client, "Fulltext/Resultpage", payload) if not inner: return [] return inner.get("docs") or [] def _make_client(self) -> httpx.AsyncClient: return httpx.AsyncClient( timeout=30, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) async def _paginated_hits( self, client: httpx.AsyncClient, *, query: str = "", ): """Async iterator over Drucksachen-style hits across all pages. Yields raw hit dicts in newest-first order. The first batch comes from ``Fulltext/Search``, subsequent batches from ``Fulltext/Resultpage`` using the queryid the server returned for the initial call. Stops when a page comes back empty, undersized, or after ``MAX_PAGES`` iterations. ``query`` is forwarded as a server-side full-text filter to ``_initial_search``; the resulting ``queryid`` is bound to that filter, so subsequent ``Resultpage`` calls automatically inherit it without needing to repeat the tag. """ queryid, hits = await self._initial_search( client, length=self.PAGE_SIZE, query=query, ) for hit in hits: yield hit if not queryid or len(hits) < self.PAGE_SIZE: return for page in range(1, self.MAX_PAGES): page_hits = await self._result_page( client, queryid=queryid, start=page * self.PAGE_SIZE, length=self.PAGE_SIZE, ) if not page_hits: return for hit in page_hits: yield hit if len(page_hits) < self.PAGE_SIZE: return # ParlDok 8.x caps Length per request at 100 — paginate if needed. PAGE_SIZE = 100 # Safety bound: scan at most 10 pages × 100 = 1000 most recent docs. # Anträge are ~3% of all hits in MV, so 1000 raw → ~30 Anträge, more # than enough for the typical UI request (limit 5..20). Filtered # queries that find nothing in the last 1000 docs return empty # rather than scan the entire WP. MAX_PAGES = 10 async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search recent documents of the configured Wahlperiode. Server-side full-text search via the ``facet_fulltext`` tag (#12) when ``query`` is non-empty; otherwise pure browse mode. The server returns the WP sorted newest-first across all document kinds, the client keeps only ``Antrag``-typed Drucksachen and dedupes by lp/number (ParlDok returns the same Drucksache multiple times when it appears in several Vorgänge/Beratungen). Pagination: ParlDok caps each response at 100 rows; further pages come from ``Fulltext/Resultpage`` bound to the server-assigned ``queryid``. """ results: list[Drucksache] = [] seen: set[str] = set() async with self._make_client() as client: await client.get(f"{self.base_url}{self.prefix}/") async for hit in self._paginated_hits(client, query=query): if hit.get("kind") != "Drucksache": continue if self.document_typ and hit.get("type") != self.document_typ: continue doc = self._hit_to_drucksache(hit) if not doc: continue if doc.drucksache in seen: continue seen.add(doc.drucksache) results.append(doc) if len(results) >= limit: return results return results async def get_document(self, drucksache: str) -> Optional[Drucksache]: """Look up a single Antrag by ``lp/number`` ID. Pragmatic MVP: page through the WP unfiltered until we find a match. ParlDok offers a ``facet_number`` (14) facet that would let us target the lookup directly, but the facet ID values are instance-specific (would require a ``Fulltext/Filter`` discovery call) and the WP-wide pagination is fast enough for the typical 2k–10k Drucksachen per period. """ wanted_lp, wanted_num = (drucksache.split("/", 1) + [""])[:2] if not wanted_num: return None async with self._make_client() as client: await client.get(f"{self.base_url}{self.prefix}/") async for hit in self._paginated_hits(client): if hit.get("kind") != "Drucksache": continue if str(hit.get("lp")) == wanted_lp and str(hit.get("number")) == wanted_num: return self._hit_to_drucksache(hit) return None async def download_text(self, drucksache: str) -> Optional[str]: """Download the PDF for a Drucksache and extract its text.""" import fitz # PyMuPDF doc = await self.get_document(drucksache) if not doc or not doc.link: return None async with httpx.AsyncClient( timeout=60, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"}, ) as client: try: resp = await client.get(doc.link) if resp.status_code != 200: logger.error( "%s PDF HTTP %s for %s (%s)", self.bundesland, resp.status_code, drucksache, doc.link, ) return None pdf = fitz.open(stream=resp.content, filetype="pdf") text = "" for page in pdf: text += page.get_text() pdf.close() return text except Exception: logger.exception("%s ParlDok download error for %s", self.bundesland, drucksache) return None class BayernAdapter(ParlamentAdapter): """Adapter for Bayerischer Landtag.""" bundesland = "BY" name = "Bayerischer Landtag" base_url = "https://www.bayern.landtag.de" async def search(self, query: str, limit: int = 20) -> list[Drucksache]: # TODO: Implement Bayern search return [] async def get_document(self, drucksache: str) -> Optional[Drucksache]: # TODO: Implement return None async def download_text(self, drucksache: str) -> Optional[str]: return None class BWAdapter(ParlamentAdapter): """Adapter for Baden-Württemberg Landtag.""" bundesland = "BW" name = "Landtag Baden-Württemberg" base_url = "https://www.landtag-bw.de" async def search(self, query: str, limit: int = 20) -> list[Drucksache]: # TODO: Implement BW search return [] async def get_document(self, drucksache: str) -> Optional[Drucksache]: return None async def download_text(self, drucksache: str) -> Optional[str]: return None # Registry of adapters ADAPTERS = { "NRW": NRWAdapter(), "LSA": PortalaAdapter( bundesland="LSA", name="Landtag von Sachsen-Anhalt (PADOKA)", base_url="https://padoka.landtag.sachsen-anhalt.de", db_id="lsa.lissh", wahlperiode=8, portala_path="/portal", document_type="Antrag", pdf_url_prefix="/files/", ), "BE": PortalaAdapter( bundesland="BE", name="Abgeordnetenhaus von Berlin (PARDOK)", base_url="https://pardok.parlament-berlin.de", db_id="lah.lissh", wahlperiode=19, portala_path="/portala", # Berlin's ETYPF index uses different value strings — drop the # document_type subtree, fall back to client-side title filter. document_type=None, # Tighter date window: BE has ~10x more documents than LSA, so a # narrower window keeps the per-request payload bounded. date_window_days=180, pdf_url_prefix="/files/", ), "MV": ParLDokAdapter( bundesland="MV", name="Landtag Mecklenburg-Vorpommern (ParlDok)", base_url="https://www.dokumentation.landtag-mv.de", wahlperiode=8, prefix="/parldok", document_typ="Antrag", ), "BY": BayernAdapter(), "BW": BWAdapter(), } def get_adapter(bundesland: str) -> Optional[ParlamentAdapter]: """Get adapter for a bundesland.""" return ADAPTERS.get(bundesland) async def search_all(query: str, bundesland: str = "NRW", limit: int = 20) -> list[Drucksache]: """Search parliament documents in a specific state.""" adapter = get_adapter(bundesland) if not adapter: return [] return await adapter.search(query, limit)