gwoe-antragspruefer/app/parlamente.py
Dotty Dotter eb045d0ed3 Phase B: Parteinamen-Mapper #55 (Roadmap #59)
Zentrale `app/parteien.py` als Single Source of Truth für die Partei-
Auflösung:

- `PARTEIEN`-Tabelle mit kanonischem Key, langem Display-Namen, allen
  bekannten Aliasen, optionalem `bundesland_scope` und Government-
  Marker. 14 Einträge (CDU, CSU, SPD, GRÜNE, FDP, LINKE, AfD, BSW, SSW,
  BiW + die Freie-Wähler-Familie BVB-FW, FW-BAYERN, FW-SL und der
  generische FREIE WÄHLER-Eintrag).
- `normalize_partei(raw, *, bundesland=None)` für Single-String-Lookups
  mit Government-Vorrang und FW-Familien-Disambiguierung
- `extract_fraktionen(text, *, bundesland=None)` als Funnel für die
  vier alten Adapter-Helper. Kommagetrennte Listen, MdL-mit-Klammer-
  partei, HTML-Reste — alles fließt durch eine Stelle, mit BL-Scope-
  Filter (SSW nur in SH, BVB-FW nur in BB, etc.).
- `display_name(canonical, *, long=False)` für UI/PDF — kurze Form
  bleibt der kanonische Key, lange Form ist "BÜNDNIS 90/DIE GRÜNEN"
  statt "GRÜNE" etc.

Adapter-Migration in `app/parlamente.py`:

- Vier nahezu identische `_normalize_fraktion()`-Methoden in
  PortalaAdapter, ParLDokAdapter, StarFinderCGIAdapter, PARLISAdapter
  durch einen einzeiligen Shim ersetzt, der `extract_fraktionen` mit
  `self.bundesland` aufruft. ~120 Zeilen Duplikation entfernt.
- `@staticmethod` aufgehoben, weil wir jetzt `self.bundesland` brauchen
  für die FW-Disambiguierung — alle Aufrufer waren bereits `self._...`,
  also keine Call-Site-Änderung nötig.

`app/embeddings.py:496` Workaround-Hack entfernt:

- `partei.upper() if partei != "GRÜNE" else "GRÜNE"` durch zentralen
  `normalize_partei()`-Aufruf ersetzt — der Hack war ein Kommentarzeichen
  dafür, dass die Partei-Schreibweise irgendwo zwischen Adapter und
  Embedding-Lookup driften konnte. Mit dem Mapper ist die Schreibweise
  überall garantiert kanonisch.

Tests:

- Neue `tests/test_parteien.py` mit 52 Cases — Single-Lookup, FW-
  Disambiguierung (BVB/Bayern/Saarland/RP), Volltext-Extraktion,
  Government-Marker, Tabellen-Konsistenz
- `tests/test_parlamente.py` Test-Klasse umgeschrieben: statt der 6
  statischen `PortalaAdapter._normalize_fraktion(...)`-Tests jetzt 4
  Roundtrip-Tests über echte Adapter-Instanzen, inkl. expliziter
  BB→BVB-FW vs. RP→FREIE WÄHLER-Verifikation

157 Unit-Tests grün (105 alt + 52 neu). Backwards-kompatibel — die
kanonischen Keys sind exakt die in der DB stehenden Strings, kein
Migrations-Schritt nötig.

Refs: #55, #59 (Phase B)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 11:22:13 +02:00

1938 lines
78 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Parliament search adapters for different German states."""
import json
import logging
import httpx
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
@dataclass
class Drucksache:
"""A parliamentary document."""
drucksache: str # e.g. "18/8125"
title: str
fraktionen: list[str]
datum: str # ISO date
link: str # PDF URL
bundesland: str
typ: str = "Antrag" # Antrag, Anfrage, Beschlussempfehlung, etc.
class ParlamentAdapter(ABC):
"""Base adapter for searching parliament documents."""
bundesland: str
name: str
@abstractmethod
async def search(self, query: str, limit: int = 20) -> list[Drucksache]:
"""Search for documents matching query."""
pass
@abstractmethod
async def get_document(self, drucksache: str) -> Optional[Drucksache]:
"""Get a specific document by ID."""
pass
@abstractmethod
async def download_text(self, drucksache: str) -> Optional[str]:
"""Download and extract text from a document."""
pass
class NRWAdapter(ParlamentAdapter):
"""Adapter for NRW Landtag (opal.landtag.nrw.de)."""
bundesland = "NRW"
name = "Landtag Nordrhein-Westfalen"
base_url = "https://opal.landtag.nrw.de"
search_url = "https://opal.landtag.nrw.de/home/dokumente/dokumentensuche/parlamentsdokumente/aktuelle-dokumente.html"
def _parse_query(self, query: str) -> tuple[str, list[str], bool]:
"""
Parse search query for AND logic and exact phrases.
Returns: (search_term_for_api, filter_terms, is_exact)
Examples:
- 'Klimaschutz Energie' -> ('Klimaschutz', ['klimaschutz', 'energie'], False)
- '"Grüner Stahl"' -> ('Grüner Stahl', ['grüner stahl'], True)
- 'Klimaschutz "erneuerbare Energie"' -> ('Klimaschutz', ['klimaschutz', 'erneuerbare energie'], False)
"""
query = query.strip()
# Check for exact phrase (entire query in quotes)
if query.startswith('"') and query.endswith('"') and query.count('"') == 2:
exact = query[1:-1].strip()
return (exact, [exact.lower()], True)
# Extract quoted phrases and regular terms
import shlex
try:
parts = shlex.split(query)
except ValueError:
# Fallback for unbalanced quotes
parts = query.split()
if not parts:
return (query, [query.lower()], False)
# Use first term for API search, all terms for filtering
filter_terms = [p.lower() for p in parts]
return (parts[0], filter_terms, False)
def _matches_all_terms(self, doc: 'Drucksache', terms: list[str], is_exact: bool) -> bool:
"""Check if document matches all search terms (AND logic)."""
searchable = f"{doc.title} {doc.drucksache} {' '.join(doc.fraktionen)} {doc.typ}".lower()
if is_exact:
# Exact phrase must appear
return terms[0] in searchable
else:
# All terms must appear (AND)
return all(term in searchable for term in terms)
async def search(self, query: str, limit: int = 20) -> list[Drucksache]:
"""Search NRW Landtag documents via OPAL portal."""
results = []
# Parse query for AND logic
api_query, filter_terms, is_exact = self._parse_query(query)
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
try:
# First, get the page to establish session
initial = await client.get(self.search_url)
if initial.status_code != 200:
print(f"NRW search initial request failed: {initial.status_code}")
return []
# Parse for webflow token from pagination links
soup = BeautifulSoup(initial.text, 'html.parser')
# Find a pagination link to extract the webflow token
pagination_link = soup.select_one('a[href*="webflowexecution"]')
webflow_token = ""
webflow_execution = ""
if pagination_link:
href = pagination_link.get('href', '')
# Extract webflowToken and webflowexecution from URL
token_match = re.search(r'webflowToken=([^&]*)', href)
exec_match = re.search(r'(webflowexecution[^=]+)=([^&]+)', href)
if token_match:
webflow_token = token_match.group(1)
if exec_match:
webflow_execution = f"{exec_match.group(1)}={exec_match.group(2)}"
# Now perform the search with POST
# Find the form action URL with webflow token
form = soup.select_one('form#docSearchByItem')
form_action = self.search_url
if form and form.get('action'):
action = form.get('action')
if action.startswith('/'):
form_action = f"{self.base_url}{action}"
elif action.startswith('http'):
form_action = action
else:
form_action = f"{self.search_url}?{action}"
# Build form data for "Einfache Suche" (searchByItem form)
form_data = {
'_eventId_sendform': '1',
'dokNum': api_query, # This is the text search field
'formId': 'searchByItem',
'dokTyp': '', # All types
'wp': '18', # Wahlperiode 18
}
# POST request with form data to the form action URL
search_resp = await client.post(
form_action,
data=form_data,
cookies=initial.cookies,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
if search_resp.status_code != 200:
print(f"NRW search request failed: {search_resp.status_code}")
return []
# Parse results
soup = BeautifulSoup(search_resp.text, 'html.parser')
# Find all document result items (li elements containing articles)
items = soup.select('li:has(article)')
for item in items[:limit]:
try:
# Extract drucksache number from first link
num_link = item.select_one('a[href*="MMD"]')
if not num_link:
continue
href = num_link.get('href', '')
# Extract number: MMD18-12345.pdf -> 18/12345
match = re.search(r'MMD(\d+)-(\d+)\.pdf', href)
if not match:
continue
legislatur, nummer = match.groups()
drucksache = f"{legislatur}/{nummer}"
pdf_url = f"https://www.landtag.nrw.de{href}" if href.startswith('/') else href
# Extract title from the title link (class e-document-result-item__title)
title_elem = item.select_one('a.e-document-result-item__title')
if title_elem:
# Get text content, clean it up
title = title_elem.get_text(strip=True)
# Remove SVG icon text and clean
title = re.sub(r'\s*<svg.*', '', title)
title = re.sub(r'\s+', ' ', title).strip()
else:
# Fallback: try to find any longer text
title = f"Drucksache {drucksache}"
# Clean up common artifacts
title = re.sub(r'\s*\(\s*externer Link.*?\)', '', title).strip()
# Extract type (Antrag, Kleine Anfrage, etc.)
typ_elem = item.select_one('.e-document-result-item__category')
typ = typ_elem.get_text(strip=True) if typ_elem else "Drucksache"
# Extract date
time_elem = item.select_one('time')
datum = ""
if time_elem:
datum_text = time_elem.get_text(strip=True)
# Convert DD.MM.YYYY to YYYY-MM-DD
date_match = re.match(r'(\d{2})\.(\d{2})\.(\d{4})', datum_text)
if date_match:
d, m, y = date_match.groups()
datum = f"{y}-{m}-{d}"
# Extract Urheber (fraktionen) - look for paragraph containing "Urheber:"
urheber_text = ""
for p in item.select('p'):
if 'Urheber:' in p.get_text():
urheber_text = p.get_text()
break
fraktionen = []
if urheber_text:
# Extract party names (SPD, CDU, GRÜNE, FDP, AfD)
for party in ['SPD', 'CDU', 'GRÜNE', 'Grüne', 'FDP', 'AfD']:
if party in urheber_text:
fraktionen.append(party.upper() if party.lower() != 'grüne' else 'GRÜNE')
doc = Drucksache(
drucksache=drucksache,
title=title,
fraktionen=fraktionen,
datum=datum,
link=pdf_url,
bundesland="NRW",
typ=typ,
)
# Apply AND filter (all terms must match)
if self._matches_all_terms(doc, filter_terms, is_exact):
results.append(doc)
except Exception as e:
print(f"Error parsing item: {e}")
continue
except Exception as e:
print(f"NRW search error: {e}")
return results
async def get_document(self, drucksache: str) -> Optional[Drucksache]:
"""Get document metadata by drucksache ID (e.g. '18/8125')."""
# Parse legislatur and number
match = re.match(r"(\d+)/(\d+)", drucksache)
if not match:
return None
legislatur, nummer = match.groups()
pdf_url = f"https://www.landtag.nrw.de/portal/WWW/dokumentenarchiv/Dokument/MMD{legislatur}-{nummer}.pdf"
# Try to fetch and extract basic info
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
try:
resp = await client.head(pdf_url)
if resp.status_code == 200:
return Drucksache(
drucksache=drucksache,
title=f"Drucksache {drucksache}",
fraktionen=[],
datum="",
link=pdf_url,
bundesland="NRW",
)
except:
pass
return None
async def download_text(self, drucksache: str) -> Optional[str]:
"""Download PDF and extract text."""
import fitz # PyMuPDF
doc = await self.get_document(drucksache)
if not doc:
return None
async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
try:
resp = await client.get(doc.link)
if resp.status_code != 200:
return None
# Extract text with PyMuPDF
pdf = fitz.open(stream=resp.content, filetype="pdf")
text = ""
for page in pdf:
text += page.get_text()
pdf.close()
return text
except Exception as e:
print(f"Error downloading {drucksache}: {e}")
return None
class PortalaAdapter(ParlamentAdapter):
"""Adapter for portala/eUI-based parliament documentation systems.
Used by parliaments running the proprietary "esearch" / portala framework
(originally developed for STAR/StarFinder backends, now wrapped in a
Single-Page App with Template Toolkit on the server side):
- **LSA** (Sachsen-Anhalt) — PADOKA at ``padoka.landtag.sachsen-anhalt.de``
under ``/portal/`` (singular)
- **BE** (Berlin) — PARDOK at ``pardok.parlament-berlin.de`` under
``/portala/`` (with the trailing 'a')
Both instances share the same JSON action schema, only the base URL,
the data source ID, the application path prefix and a few minor
quirks differ — those are constructor parameters so that the same
class can serve both states (and any future portala-based parliament).
The search workflow is two-stage:
1. ``POST {base}{path}/browse.tt.json`` with a complex JSON ``action``
body that contains an Elasticsearch-style query tree under
``search.json``. The server returns a ``report_id`` plus hit count.
2. ``POST {base}{path}/report.tt.html`` with ``{report_id, start,
chunksize}`` to fetch the HTML hit list. Each hit carries a Perl
Data::Dumper block in a ``<pre>`` tag with the canonical metadata.
The query body schema was reverse-engineered from
https://github.com/okfde/dokukratie/blob/main/dokukratie/scrapers/portala.query.json
(GPL-3.0 — only structure/selectors are reused, not Python code).
Full-text search is **not** implemented in the MVP: the adapter
returns documents of the current Wahlperiode in the given date
window, and the search query is applied as a client-side
title/Urheber filter. The server-side full-text path requires
state-specific ``sf`` index names that are not yet known.
"""
def __init__(
self,
*,
bundesland: str,
name: str,
base_url: str,
db_id: str,
wahlperiode: int,
portala_path: str = "/portal",
document_type: Optional[str] = "Antrag",
pdf_url_prefix: str = "/files/",
date_window_days: int = 730,
typ_filter: Optional[str] = "DOKDBE",
omit_date_filter: bool = False,
) -> None:
"""Configure a portala/eUI adapter for one specific parliament.
Args:
bundesland: state code (e.g. ``"LSA"``, ``"BE"``).
name: human-readable adapter label (used in logs/UI).
base_url: ``https://...`` of the portal host without trailing slash.
db_id: data source identifier the eUI server expects in
``action.sources``, e.g. ``"lsa.lissh"`` or ``"lah.lissh"``.
wahlperiode: current legislative period — fed into the WP
term of the search tree.
portala_path: path prefix where the portala app lives. ``/portal``
for LSA, ``/portala`` for Berlin.
document_type: optional filter applied via ETYPF/DTYPF/DART
terms. ``"Antrag"`` works for LSA; for instances where
the index uses different document_type values (e.g. Berlin),
pass ``None`` to drop the document_type subtree entirely
— the user can still filter client-side by title.
pdf_url_prefix: URL fragment between ``base_url`` and the
relative PDF path returned by the server.
date_window_days: how many days back ``search()`` looks by
default.
typ_filter: ``TYP=<value>`` term in the parsed string and
JSON tree. ``DOKDBE`` works for LSA/BE/BB/BW (the
lissh-style instances). For Hessen (``hlt.lis``) and
similar instances the value is different or absent —
pass ``None`` to drop the term entirely.
"""
self.bundesland = bundesland
self.name = name
self.base_url = base_url.rstrip("/")
self.db_id = db_id
self.wahlperiode = wahlperiode
self.portala_path = "/" + portala_path.strip("/")
self.document_type = document_type
self.pdf_url_prefix = "/" + pdf_url_prefix.strip("/") + "/"
self.date_window_days = date_window_days
self.typ_filter = typ_filter
self.omit_date_filter = omit_date_filter
# ── LSA-style hit list (Perl Data::Dumper inside <pre> blocks) ──
# Reverse-engineered "WEV*" record fields:
# WEV06.main = title
# WEV32.5 = relative PDF path
# WEV32.main = "Antrag <Urheber> <DD.MM.YYYY> Drucksache <b>X/YYYY</b> ..."
_RE_TITLE = re.compile(r"'WEV06'\s*=>\s*\[\s*\{\s*'main'\s*=>\s*[\"']([^\"']+)[\"']")
_RE_PDF = re.compile(r"'5'\s*=>\s*'([^']*\.pdf)'")
_RE_DRUCKSACHE = re.compile(r"Drucksache\s*<b>(\d+/\d+)</b>")
_RE_URHEBER_DATUM = re.compile(
r"'WEV32'\s*=>\s*\[\s*\{[^}]*'main'\s*=>\s*[\"']Antrag\s+(.+?)\s+(\d{1,2}\.\d{1,2}\.\d{4})\s+Drucksache",
)
_RE_PRE_BLOCK = re.compile(r'<pre>\$VAR1 = (.*?)</pre>', re.DOTALL)
# ── Berlin-style hit list (production HTML cards, no Perl dump) ──
# The whole div for one record:
_RE_BE_RECORD = re.compile(
r'<div[^>]*class="[^"]*efxRecordRepeater[^"]*"[^>]*data-efx-rec="[^"]*"[^>]*>(.*?)(?=<div[^>]*efxRecordRepeater|<div[^>]*id="efxResultsEnd"|</main>|$)',
re.DOTALL,
)
_RE_BE_TITLE = re.compile(r'<h3[^>]*class="h5[^"]*"[^>]*>\s*<span>([^<]+)</span>')
_RE_BE_LINK = re.compile(r'<a[^>]*href="([^"]+\.pdf)"[^>]*>')
# The metadata h6 looks like:
# <span class="h6">Antrag (Eilantrag) &nbsp;<a ...>Drucksache 19/3104</a> S. 1 bis 24 vom 31.03.2026</span>
_RE_BE_DRUCKSACHE = re.compile(r'Drucksache\s+(\d+/\d+)')
# BE has "Drucksache 19/3104 S. 1 bis 24 vom 31.03.2026" — date is
# marked by ``vom``. BB has the BE card format too but writes the
# date BEFORE the Drucksachen-Nummer with no marker:
# "Antrag Reinhard Simon (BSW) 17.10.2024 Drucksache 8/2 (1 S.)".
# Try ``vom``-prefix first; fall back to the first plain date.
_RE_BE_DATUM_VOM = re.compile(r'vom\s+(\d{1,2}\.\d{1,2}\.\d{4})')
_RE_BE_DATUM_PLAIN = re.compile(r'(\d{1,2}\.\d{1,2}\.\d{4})')
_RE_BE_DOCTYPE = re.compile(r'<span class="h6">\s*([^<&]+?)(?:&nbsp;|<)')
@staticmethod
def _decode_perl_hex(s: str) -> str:
"""Decode \\x{abcd} escape sequences from Perl Data::Dumper output."""
return re.sub(r'\\x\{([0-9a-f]+)\}', lambda m: chr(int(m.group(1), 16)), s)
def _normalize_fraktion(self, urheber: str) -> list[str]:
"""Thin shim — die ganze Regex-Logik lebt jetzt zentral in
``app.parteien.extract_fraktionen`` (siehe #55). ``self.bundesland``
wird mitgegeben, damit FW-Familien-Aliase korrekt disambiguiert
werden.
"""
from .parteien import extract_fraktionen
return extract_fraktionen(urheber, bundesland=self.bundesland)
def _build_search_body(
self,
wahlperiode: int,
start_date: str,
end_date: str,
) -> dict:
"""Build the action JSON body for browse.tt.json.
The schema is taken from dokukratie's portala.query.json template
and only differs in the data source and the variable substitutions.
When ``self.document_type`` is None, the ETYPF/DTYPF/DART subtree
is dropped — useful for parliaments whose ETYPF index uses
different value strings than ``"Antrag"``.
"""
document_type = self.document_type
date_range_text = f"{start_date} THRU {end_date}"
date_term = lambda sf, num: { # noqa: E731 — local helper
"tn": "trange", "sf": sf, "op": "eq", "num": num,
"idx": 119, "l": 3,
"p1": start_date, "t1": start_date,
"p2": end_date, "t2": end_date,
"t": date_range_text,
}
# Build the search.lines (form-state mirror) and the json tree
lines: dict = {
"2": str(wahlperiode),
"10": start_date,
"11": end_date,
"20.1": "alWEBBI",
"20.2": "alWEBBI",
"20.3": "alWEBBI",
"90.1": "AND",
"90.2": "AND",
"90.3": "AND",
}
if document_type is not None:
lines["3"] = document_type
lines["4"] = "D"
# Top-level AND tree
top_terms: list = [
{"tn": "term", "t": str(wahlperiode), "idx": 6, "l": 3,
"sf": "WP", "op": "eq", "num": 5},
]
if document_type is not None:
top_terms.append({"tn": "or", "num": 3, "terms": [
{"tn": "or", "num": 4, "terms": [
{"tn": "term", "t": f'"{document_type}"', "idx": 50,
"l": 4, "sf": "ETYPF", "op": "eq", "num": 10},
{"tn": "term", "t": f'"{document_type}"', "idx": 50,
"l": 4, "sf": "ETYP2F", "op": "eq", "num": 11},
{"tn": "term", "t": f'"{document_type}"', "idx": 50,
"l": 4, "sf": "DTYPF", "op": "eq", "num": 12},
{"tn": "term", "t": f'"{document_type}"', "idx": 50,
"l": 4, "sf": "DTYP2F", "op": "eq", "num": 13},
{"tn": "term", "t": f'"{document_type}"', "idx": 50,
"l": 4, "sf": "1VTYPF", "op": "eq", "num": 14},
]},
{"tn": "or", "num": 15, "terms": [
{"tn": "term", "t": '"D"', "idx": 93, "l": 4,
"sf": "DART", "op": "eq", "num": 16},
{"tn": "term", "t": '"D"', "idx": 93, "l": 4,
"sf": "DARTS", "op": "eq", "num": 17},
]},
]})
if not self.omit_date_filter:
top_terms.append({"tn": "or", "num": 18, "terms": [
{"tn": "or", "num": 19, "terms": [
date_term("DAT", 20),
date_term("DDAT", 21),
]},
date_term("SDAT", 22),
]})
if self.typ_filter is not None:
top_terms.append({"tn": "term", "t": self.typ_filter, "idx": 156, "l": 1,
"sf": "TYP", "op": "eq", "num": 23})
# Mirror the same shape into the parsed/sref display strings
typ_clause = f" AND TYP={self.typ_filter}" if self.typ_filter is not None else ""
date_clause = (
f" AND (DAT,DDAT,SDAT= {date_range_text})"
if not self.omit_date_filter else ""
)
if document_type is not None:
parsed = (
f"((/WP {wahlperiode}) AND "
f"(/ETYPF,ETYP2F,DTYPF,DTYP2F,1VTYPF (\"{document_type}\")) "
f"AND (/DART,DARTS (\"D\")){date_clause}){typ_clause}"
)
else:
parsed = f"((/WP {wahlperiode}){date_clause}){typ_clause}"
return {
"action": "SearchAndDisplay",
"sources": [self.db_id],
"report": {
"rhl": "main",
"rhlmode": "add",
"format": "generic1-full",
"mime": "html",
"sort": "WEVSO1/D WEVSO2 WEVSO3",
},
"search": {
"lines": lines,
"serverrecordname": "sr_generic1",
"parsed": parsed,
"sref": parsed,
"json": [{
"tn": "and",
"num": 1,
"terms": top_terms,
}],
},
"dataSet": "1",
}
@staticmethod
def _datum_de_to_iso(datum_de: str) -> str:
"""Convert DD.MM.YYYY → YYYY-MM-DD; return '' for empty input."""
if not datum_de:
return ""
d, m, y = datum_de.split(".")
return f"{y}-{m.zfill(2)}-{d.zfill(2)}"
def _parse_hit_list_html(self, html: str, query_filter: str = "") -> list[Drucksache]:
"""Extract Drucksachen from a report.tt.html response.
Two formats are supported and auto-detected:
- **LSA-style:** the records are embedded as Perl Data::Dumper
dumps inside ``<pre>$VAR1 = …</pre>`` blocks. WEV06 → title,
WEV32 → metadata + PDF path. Used by Sachsen-Anhalt's PADOKA
template.
- **Berlin-style:** standard production HTML cards with
``efxRecordRepeater`` divs. Title in an ``<h3 class="h5">``,
metadata + PDF link in an ``<span class="h6">``. Used by
Berlin's PARDOK template.
"""
if self._RE_PRE_BLOCK.search(html):
return self._parse_hit_list_dump(html, query_filter)
return self._parse_hit_list_cards(html, query_filter)
def _parse_hit_list_dump(self, html: str, query_filter: str) -> list[Drucksache]:
"""Parse LSA-style ``<pre>$VAR1 = …</pre>`` Perl-dump records."""
results: list[Drucksache] = []
for pre in self._RE_PRE_BLOCK.findall(html):
m_ds = self._RE_DRUCKSACHE.search(pre)
if not m_ds:
continue
drucksache = m_ds.group(1)
m_t = self._RE_TITLE.search(pre)
title = self._decode_perl_hex(m_t.group(1)) if m_t else f"Drucksache {drucksache}"
m_pdf = self._RE_PDF.search(pre)
pdf_rel = m_pdf.group(1) if m_pdf else ""
pdf_url = f"{self.base_url}{self.pdf_url_prefix}{pdf_rel}" if pdf_rel else ""
m_w32 = self._RE_URHEBER_DATUM.search(pre)
urheber = self._decode_perl_hex(m_w32.group(1).strip()) if m_w32 else ""
datum_iso = self._datum_de_to_iso(m_w32.group(2) if m_w32 else "")
fraktionen = self._normalize_fraktion(urheber) if urheber else []
doc = Drucksache(
drucksache=drucksache,
title=title,
fraktionen=fraktionen,
datum=datum_iso,
link=pdf_url,
bundesland=self.bundesland,
typ="Antrag",
)
if query_filter:
hay = f"{title} {urheber}".lower()
if not all(t in hay for t in query_filter.lower().split()):
continue
results.append(doc)
return results
def _parse_hit_list_cards(self, html: str, query_filter: str) -> list[Drucksache]:
"""Parse Berlin-style ``efxRecordRepeater`` HTML-card records.
Each card contains an ``<h3>`` title, a metadata ``<span class="h6">``
with the document type, the Drucksachen-Nummer, and the date,
plus a direct ``<a href="…pdf">`` link to the PDF on the same host.
"""
results: list[Drucksache] = []
# Split the HTML on every record-div opener — easier than balancing
# divs with regex.
chunks = html.split('class="record')
# First chunk is the prelude, skip it
for chunk in chunks[1:]:
# Each chunk now starts at the record class attribute
m_t = self._RE_BE_TITLE.search(chunk)
title = m_t.group(1).strip() if m_t else "Ohne Titel"
m_ds = self._RE_BE_DRUCKSACHE.search(chunk)
if not m_ds:
continue
drucksache = m_ds.group(1)
m_pdf = self._RE_BE_LINK.search(chunk)
pdf_url = ""
if m_pdf:
href = m_pdf.group(1)
if href.startswith("http://") or href.startswith("https://"):
pdf_url = href
elif href.startswith("/"):
pdf_url = f"{self.base_url}{href}"
else:
pdf_url = f"{self.base_url}{self.pdf_url_prefix}{href}"
m_dat = self._RE_BE_DATUM_VOM.search(chunk) or self._RE_BE_DATUM_PLAIN.search(chunk)
datum_iso = self._datum_de_to_iso(m_dat.group(1) if m_dat else "")
m_doc = self._RE_BE_DOCTYPE.search(chunk)
doctype_full = m_doc.group(1).strip() if m_doc else "Drucksache"
# Berlin often packs the originator(s) into the same h6 line:
# "Antrag CDU, SPD" → fraktionen = ["CDU","SPD"], typ = "Antrag"
# Senat-Vorlagen carry no fraction, only "Vorlage zur …".
fraktionen = self._normalize_fraktion(doctype_full)
# Strip the fraction names back out of the typ string so the UI
# shows a clean "Antrag" / "Vorlage …" label.
typ = doctype_full
if fraktionen:
# Cut at the first occurrence of any party name
cuts = [typ.upper().find(f.upper()) for f in fraktionen]
cuts = [c for c in cuts if c >= 0]
if cuts:
typ = typ[: min(cuts)].rstrip(" ,")
doc = Drucksache(
drucksache=drucksache,
title=title,
fraktionen=fraktionen,
datum=datum_iso,
link=pdf_url,
bundesland=self.bundesland,
typ=typ,
)
if query_filter:
hay = f"{title} {doctype_full}".lower()
if not all(t in hay for t in query_filter.lower().split()):
continue
results.append(doc)
return results
async def search(self, query: str, limit: int = 20) -> list[Drucksache]:
"""Search recent documents of the current Wahlperiode.
``query`` is applied as a client-side title/Urheber filter; the
server-side query covers the configured ``date_window_days``
(default 24 months).
"""
from datetime import date, timedelta
end = date.today()
start = end - timedelta(days=self.date_window_days)
body = self._build_search_body(
wahlperiode=self.wahlperiode,
start_date=start.isoformat(),
end_date=end.isoformat(),
)
browse_html = f"{self.base_url}{self.portala_path}/browse.tt.html"
browse_json = f"{self.base_url}{self.portala_path}/browse.tt.json"
report_html = f"{self.base_url}{self.portala_path}/report.tt.html"
async with httpx.AsyncClient(
# Bumped from 30s for #13 quick-win: chunksize=500 against the
# LSA report.tt.html endpoint occasionally takes 30+ seconds.
timeout=60,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"},
) as client:
try:
# Step 1: warm up cookies via the browse page
await client.get(browse_html)
# Step 2: submit the search action
resp = await client.post(
browse_json,
json=body,
headers={"Referer": browse_html},
)
if resp.status_code != 200:
logger.error("%s search HTTP %s", self.bundesland, resp.status_code)
return []
data = resp.json()
report_id = data.get("report_id")
if not report_id:
logger.error("%s: no report_id in response: %s", self.bundesland, data)
return []
# Step 3: fetch the HTML hit list
# Take a generous chunk so the client-side title filter
# still has enough material to work with. Quick-win for #13
# until the eUI sf-Index for real server-side fulltext is
# reverse-engineered: bump the unfiltered chunk floor and
# the query-filtered chunk ceiling.
chunksize = max(limit * 10, 500) if query else max(limit * 2, 100)
report_resp = await client.post(
report_html,
json={"report_id": report_id, "start": 0, "chunksize": chunksize},
headers={"Referer": browse_html},
)
if report_resp.status_code != 200:
logger.error("%s report HTTP %s", self.bundesland, report_resp.status_code)
return []
results = self._parse_hit_list_html(report_resp.text, query_filter=query)
return results[:limit]
except Exception:
logger.exception("%s search error", self.bundesland)
return []
async def get_document(self, drucksache: str) -> Optional[Drucksache]:
"""Look up a single document by ID via the search endpoint with a
document_number filter."""
# Pragmatic MVP: do a broad search and filter for the requested ID.
# A targeted single-document fetch would require a different
# action.search.json structure that we have not reverse-engineered yet.
results = await self.search(query="", limit=200)
for doc in results:
if doc.drucksache == drucksache:
return doc
return None
async def download_text(self, drucksache: str) -> Optional[str]:
"""Download the PDF for a Drucksache and extract its text."""
import fitz # PyMuPDF
doc = await self.get_document(drucksache)
if not doc or not doc.link:
return None
async with httpx.AsyncClient(
timeout=60,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"},
) as client:
try:
resp = await client.get(doc.link)
if resp.status_code != 200:
return None
pdf = fitz.open(stream=resp.content, filetype="pdf")
text = ""
for page in pdf:
text += page.get_text()
pdf.close()
return text
except Exception:
logger.exception("%s download error for %s", self.bundesland, drucksache)
return None
class ParLDokAdapter(ParlamentAdapter):
"""Adapter for ParlDok 8.x parliament documentation systems (J3S GmbH).
ParlDok is a proprietary parliament documentation product by J3S GmbH
(https://www.j3s.de). Different from the portala/eUI framework used by
LSA/BE: ParlDok 8.x is a single-page app whose backend is a JSON API
rooted at ``{base_url}{prefix}/Fulltext/...``. The legacy ParLDok 5.x
HTML POST form (``parldok/formalkriterien``) used by dokukratie's MV
YAML scraper has been deprecated by the LandtagMV upgrade to 8.3.5.
Confirmed instances using this engine (April 2026):
- **MV** (Mecklenburg-Vorpommern) — ``dokumentation.landtag-mv.de/parldok``
- HH, SN, TH all advertise ParlDok in dokukratie but their actual
versions/themes have not been verified yet.
Search workflow:
1. ``GET {base_url}{prefix}/`` to obtain the session cookie. The
backend rejects POSTs without it.
2. ``POST {base_url}{prefix}/Fulltext/Search`` with form-encoded
``data=<json>`` payload. The JSON carries a ``tags`` array of
facet selections; each tag is ``{"type": <facet_type_int>,
"id": <facet_value>}``. Reverse-engineered facet type constants
from the bundle.js (``pd.facet_*``):
- ``facet_fraction = 2``
- ``facet_kind = 7`` (Drucksache, Plenarprotokoll, …)
- ``facet_type = 8`` (Antrag, Gesetzentwurf, Kleine Anfrage, …)
- ``facet_lp = 10`` (Wahlperiode)
Response is JSON ``{success, data: <stringified JSON>}`` where the
inner ``data`` carries ``{count, docs: [{id, title, date,
authorhtml, kind, type, lp, number, link, ...}], ...}``.
3. PDF download: ``GET {base_url}{prefix}/dokument/{numeric_id}``.
Returns ``application/pdf`` directly. The ``link`` field returned
by the search API already contains the path fragment
``/dokument/<id>#navpanes=0`` — strip the fragment and prepend
the configured ``prefix``.
Drucksachen-Nummer is reconstructed as ``f"{lp}/{number}"`` from the
search hit. Full-text search is *not* implemented in this MVP — the
backend supports it via ``facet_fulltext = 0`` tags but the public
LP-only filter already returns the relevant Antrag pool. ``query``
is applied as a client-side title/Urheber filter.
"""
# Reverse-engineered facet type constants from bundle.js (pd.facet_*).
FACET_FULLTEXT = 0
FACET_FRACTION = 2
FACET_KIND = 7
FACET_TYPE = 8
FACET_LP = 10
def __init__(
self,
*,
bundesland: str,
name: str,
base_url: str,
wahlperiode: int,
prefix: str = "/parldok",
document_typ: str = "Antrag",
document_typ_substring: bool = False,
kinds: Optional[list[str]] = None,
) -> None:
"""Configure a ParlDok 8.x adapter for one specific parliament.
Args:
bundesland: state code, e.g. ``"MV"``.
name: human-readable label.
base_url: ``https://...`` host root, no trailing slash.
wahlperiode: current legislative period — fed into the
``facet_lp`` tag of the search payload.
prefix: app prefix where ParlDok lives. ``/parldok`` for MV.
document_typ: client-side filter on the ``type`` field of
each hit ("Antrag", "Gesetzentwurf", …). Set to empty
string to disable type filtering.
document_typ_substring: if True, ``document_typ`` is matched
as a substring against the hit's ``type`` field instead
of an exact match. Needed for instances where the
Drucksachen-Anträge live under composite type strings
like ``"Antrag gemäß § 79 GO"`` (Thüringen) — strict
``"Antrag"`` would never match.
kinds: optional list of acceptable ``kind`` values. Defaults
to ``["Drucksache"]`` if None — but TH packs its Anträge
under ``kind="Vorlage"`` so the parameter has to be
widened there.
"""
self.bundesland = bundesland
self.name = name
self.base_url = base_url.rstrip("/")
self.prefix = "/" + prefix.strip("/")
self.wahlperiode = wahlperiode
self.document_typ = document_typ
self.document_typ_substring = document_typ_substring
self.kinds = kinds if kinds is not None else ["Drucksache"]
def _hit_matches_filters(self, hit: dict) -> bool:
"""Apply the kind/typ filters to a raw hit dict.
Centralised so the search loop can short-circuit cleanly. ``hit``
comes from ``Fulltext/Search`` or ``Fulltext/Resultpage`` JSON
responses; both share the same record schema.
"""
if self.kinds and hit.get("kind") not in self.kinds:
return False
hit_type = (hit.get("type") or "").strip()
if self.document_typ:
if self.document_typ_substring:
if self.document_typ not in hit_type:
return False
else:
if hit_type != self.document_typ:
return False
return True
@staticmethod
def _datum_de_to_iso(datum_de: str) -> str:
"""DD.MM.YYYY → YYYY-MM-DD; '' for empty input."""
if not datum_de:
return ""
try:
d, m, y = datum_de.split(".")
return f"{y}-{m.zfill(2)}-{d.zfill(2)}"
except ValueError:
return ""
def _normalize_fraktion(self, authorhtml: str) -> list[str]:
"""Thin shim — siehe ``app.parteien.extract_fraktionen``. #55."""
from .parteien import extract_fraktionen
return extract_fraktionen(authorhtml, bundesland=self.bundesland)
@staticmethod
def _fulltext_id(term: str) -> str:
"""Sanitize a search term to ParlDok's facet ID format.
Mirrors ``pd.getFulltextId`` from ``bundle.js``: replace every
non-alphanumeric character with ``-``. The server uses this to
deduplicate identical search facets.
"""
return re.sub(r"[^a-zA-Z0-9]", "-", term)
def _build_search_body(self, *, length: int = 100, query: str = "") -> dict:
"""Build the JSON payload for the initial ``Fulltext/Search`` call.
Filters by Wahlperiode only — type/kind/fulltext filtering all
happen client-side after the hit list is paginated. The
``query`` parameter is accepted for API compatibility but is
currently NOT forwarded to the server (#18: einheitliche
client-side Title-Suche, kein Server-Volltext, weil das
Verhalten zwischen Adaptern sonst asymmetrisch wird). The
``FACET_FULLTEXT`` constant and :meth:`_fulltext_id` helper
are kept around as documentation for the previous #12
server-side variant — when fulltext gets uniformly
re-introduced later, the dormant tag is just::
{"type": self.FACET_FULLTEXT,
"id": self._fulltext_id(query),
"fulltext": query, "label": query, "field": "Alle"}
Pagination beyond the first page goes through
``Fulltext/Resultpage`` — the ``Search`` endpoint itself
ignores any non-zero ``Start``.
"""
del query # explicitly unused — see docstring
tags: list[dict] = [{"type": self.FACET_LP, "id": self.wahlperiode}]
return {
"devicekey": "",
"max": length,
"withfilter": False,
# sort=2 → newest first (date desc); sort=1 is relevance.
"sort": 2,
"topk": length,
"llm": 0,
"newdocsearch": False,
"limit": {"Start": 0, "Length": length},
"tags": tags,
"updateFilters": [],
}
def _hit_to_drucksache(self, hit: dict) -> Optional[Drucksache]:
"""Convert one ParlDok JSON hit to a Drucksache. None if unusable."""
lp = hit.get("lp")
number = hit.get("number")
if not lp or not number:
return None
link_field = hit.get("link") or hit.get("prelink") or ""
# Strip "#navpanes=0" fragment and prepend the prefix.
path = link_field.split("#", 1)[0]
pdf_url = f"{self.base_url}{self.prefix}{path}" if path else ""
return Drucksache(
drucksache=f"{lp}/{number}",
title=hit.get("title", ""),
fraktionen=self._normalize_fraktion(hit.get("authorhtml", "")),
datum=self._datum_de_to_iso(hit.get("date", "")),
link=pdf_url,
bundesland=self.bundesland,
typ=hit.get("type", "") or hit.get("kind", ""),
)
async def _post_json(
self, client: httpx.AsyncClient, endpoint: str, payload: dict,
) -> Optional[dict]:
"""POST a JSON-stringified payload to a ParlDok endpoint.
``endpoint`` is the path tail (e.g. ``"Fulltext/Search"`` or
``"Fulltext/Resultpage"``). Returns the inner JSON object
(already parsed from the stringified ``data`` field), or None
on error.
"""
homepage = f"{self.base_url}{self.prefix}/"
url = f"{self.base_url}{self.prefix}/{endpoint}"
try:
resp = await client.post(
url,
data={"data": json.dumps(payload, ensure_ascii=False)},
headers={
"X-Requested-With": "XMLHttpRequest",
"Referer": homepage,
},
)
if resp.status_code != 200:
logger.error(
"%s %s HTTP %s",
self.bundesland, endpoint, resp.status_code,
)
return None
outer = resp.json()
if not outer.get("success"):
logger.error(
"%s %s not successful: %s",
self.bundesland, endpoint, outer.get("message"),
)
return None
return json.loads(outer["data"])
except Exception:
logger.exception("%s ParlDok %s error", self.bundesland, endpoint)
return None
async def _initial_search(
self, client: httpx.AsyncClient, *, length: int,
) -> tuple[Optional[int], list[dict]]:
"""Run the initial ``Fulltext/Search`` and return ``(queryid, docs)``.
The ``queryid`` is needed for subsequent ``Fulltext/Resultpage``
calls. ParlDok ignores any non-zero ``Start`` on this endpoint —
the first 100 hits are the only ones reachable via ``Search``.
"""
body = self._build_search_body(length=length)
inner = await self._post_json(client, "Fulltext/Search", body)
if not inner:
return None, []
return inner.get("queryid"), (inner.get("docs") or [])
async def _result_page(
self, client: httpx.AsyncClient, *, queryid: int, start: int, length: int,
) -> list[dict]:
"""Fetch a further result page via ``Fulltext/Resultpage``."""
payload = {
"devicekey": "",
"queryid": queryid,
"limit": {"Start": start, "Length": length},
}
inner = await self._post_json(client, "Fulltext/Resultpage", payload)
if not inner:
return []
return inner.get("docs") or []
def _make_client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(
timeout=30,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"},
)
async def _paginated_hits(self, client: httpx.AsyncClient):
"""Async iterator over Drucksachen-style hits across pages.
Yields raw hit dicts in newest-first order. The first batch comes
from ``Fulltext/Search``, subsequent batches from
``Fulltext/Resultpage`` using the queryid the server returned for
the initial call. Stops when a page comes back empty, undersized,
or after :attr:`MAX_PAGES` iterations.
"""
queryid, hits = await self._initial_search(client, length=self.PAGE_SIZE)
for hit in hits:
yield hit
if not queryid or len(hits) < self.PAGE_SIZE:
return
for page in range(1, self.MAX_PAGES):
page_hits = await self._result_page(
client,
queryid=queryid,
start=page * self.PAGE_SIZE,
length=self.PAGE_SIZE,
)
if not page_hits:
return
for hit in page_hits:
yield hit
if len(page_hits) < self.PAGE_SIZE:
return
# ParlDok 8.x caps Length per request at 100 — paginate if needed.
PAGE_SIZE = 100
# Safety bound: scan at most 10 pages × 100 = 1000 most recent docs.
# Anträge are ~3% of all hits in MV, so 1000 raw → ~30 Anträge, more
# than enough for the typical UI request (limit 5..20). Filtered
# queries that find nothing in the last 1000 docs return empty
# rather than scan the entire WP — same trade-off as the BE/LSA
# PortalaAdapter quick-win window.
MAX_PAGES = 10
async def search(self, query: str, limit: int = 20) -> list[Drucksache]:
"""Search the configured Wahlperiode, sorted newest-first.
#18: einheitliches Verhalten — Server filtert nur nach WP, der
Client paginiert über die ganze WP und filtert lokal nach
Treffern in Titel oder Urheber. Volltext-Filter (#12) ist
zurückgebaut, weil das Verhalten zwischen Adaptern sonst
asymmetrisch wird. Sortierung kommt vom Server (newest-first
durch ``sort=2`` in :meth:`_build_search_body`).
Dedupe per ``lp/number`` weil ParlDok dieselbe Drucksache
mehrfach in verschiedenen Vorgängen/Beratungen liefert.
"""
results: list[Drucksache] = []
seen: set[str] = set()
query_terms = [t.lower() for t in query.split() if t] if query else []
async with self._make_client() as client:
await client.get(f"{self.base_url}{self.prefix}/")
async for hit in self._paginated_hits(client):
if not self._hit_matches_filters(hit):
continue
doc = self._hit_to_drucksache(hit)
if not doc:
continue
if doc.drucksache in seen:
continue
seen.add(doc.drucksache)
if query_terms:
hay = f"{doc.title} {hit.get('authorhtml', '')}".lower()
if not all(t in hay for t in query_terms):
continue
results.append(doc)
if len(results) >= limit:
return results
return results
async def get_document(self, drucksache: str) -> Optional[Drucksache]:
"""Look up a single Antrag by ``lp/number`` ID.
Pragmatic MVP: page through the WP unfiltered until we find a
match. ParlDok offers a ``facet_number`` (14) facet that would
let us target the lookup directly, but the facet ID values are
instance-specific (would require a ``Fulltext/Filter`` discovery
call) and the WP-wide pagination is fast enough for the typical
2k10k Drucksachen per period.
"""
wanted_lp, wanted_num = (drucksache.split("/", 1) + [""])[:2]
if not wanted_num:
return None
async with self._make_client() as client:
await client.get(f"{self.base_url}{self.prefix}/")
async for hit in self._paginated_hits(client):
# Don't apply doc-type filters here — get_document is
# used to look up arbitrary Drucksachen, including ones
# whose kind/typ doesn't match the search-time filter.
if str(hit.get("lp")) == wanted_lp and str(hit.get("number")) == wanted_num:
return self._hit_to_drucksache(hit)
return None
async def download_text(self, drucksache: str) -> Optional[str]:
"""Download the PDF for a Drucksache and extract its text."""
import fitz # PyMuPDF
doc = await self.get_document(drucksache)
if not doc or not doc.link:
return None
async with httpx.AsyncClient(
timeout=60,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"},
) as client:
try:
resp = await client.get(doc.link)
if resp.status_code != 200:
logger.error(
"%s PDF HTTP %s for %s (%s)",
self.bundesland, resp.status_code, drucksache, doc.link,
)
return None
pdf = fitz.open(stream=resp.content, filetype="pdf")
text = ""
for page in pdf:
text += page.get_text()
pdf.close()
return text
except Exception:
logger.exception("%s ParlDok download error for %s", self.bundesland, drucksache)
return None
class StarFinderCGIAdapter(ParlamentAdapter):
"""Adapter for old-school CGI Starfinder instances.
Currently used by Schleswig-Holstein on
``lissh.lvn.parlanet.de/cgi-bin/starfinder/0`` — the **oldest** of the
parliament backends we touch. Predates StarWeb's HTML form-submit
machinery: instead of submitting a stateful AdvancedSearch form
(which BB/HE/NI/RP/HB do), Starfinder accepts the entire query as
URL parameters and returns plain HTML with a flat ``<tr>`` table of
records.
Reverse-engineering quelle: ``dokukratie/sh.yml`` plus a probe
against the live endpoint. Format details:
- URL template: ``{base}/cgi-bin/starfinder/0?path={db_path}&id=FASTLINK
&pass=&search={starfinder_query}&format=WEBKURZFL``
- Query syntax: ``WP=20+AND+dtyp=antrag`` (URL-encoded). The
``dtyp`` codes are lowercase short labels (``antrag``, ``kleine``).
- Encoding: ``iso-8859-1`` (Latin-1) — NOT UTF-8. The HTTP response
doesn't always declare it via Content-Type, so we explicitly
decode with ``latin1`` to avoid mojibake on the German umlauts.
- Hit-format: each record is one ``<tr class="tabcol|tabcol2|tabcol3">``
with the title in ``<b>``, then ``Antrag <Urheber> <DD.MM.YYYY>
Drucksache <a href="...pdf">XX/YYYY</a>``.
"""
_RE_RECORD = re.compile(
r'<tr class="tabcol[23]?">.*?</tr>',
re.DOTALL,
)
_RE_TITLE = re.compile(r"<b>(.*?)</b>", re.DOTALL)
_RE_DRUCKSACHE_LINK = re.compile(
r'<a href="([^"]+\.pdf)"[^>]*>(\d+/\d+)</a>'
)
# The line between <b>title</b> and the <a>-link looks like:
# "Antrag Christian Dirschauer (SSW) 07.04.2026 Drucksache "
# We pull the originator(s) and the date out of it.
_RE_URHEBER_DATUM = re.compile(
r"</b>\s*<br>\s*[A-Za-zÄÖÜäöüß]+\s+(.+?)\s+(\d{1,2}\.\d{1,2}\.\d{4})\s+Drucksache",
re.DOTALL,
)
def __init__(
self,
*,
bundesland: str,
name: str,
base_url: str,
wahlperiode: int,
db_path: str = "lisshfl.txt",
document_typ_code: str = "antrag",
) -> None:
self.bundesland = bundesland
self.name = name
self.base_url = base_url.rstrip("/")
self.wahlperiode = wahlperiode
self.db_path = db_path
self.document_typ_code = document_typ_code
@staticmethod
def _datum_de_to_iso(datum_de: str) -> str:
if not datum_de:
return ""
try:
d, m, y = datum_de.split(".")
return f"{y}-{m.zfill(2)}-{d.zfill(2)}"
except ValueError:
return ""
def _normalize_fraktion(self, text: str) -> list[str]:
"""Thin shim — siehe ``app.parteien.extract_fraktionen``. #55.
SH-spezifisch: SSW gehört zur SH-Tabelle und wird durch
``bundesland=SH`` korrekt mit-extrahiert.
"""
from .parteien import extract_fraktionen
return extract_fraktionen(text, bundesland=self.bundesland)
def _build_url(self) -> str:
"""Build the Starfinder URL for the structural WP+dtyp browse.
Free-text filtering is done client-side on the parsed records
(consistent with #18 — alle Adapter machen einheitlich Title-
Filter ohne Server-Volltext, weil das Verhalten zwischen
Adaptern sonst asymmetrisch wird).
"""
search_param = f"WP={self.wahlperiode}+AND+dtyp={self.document_typ_code}"
return (
f"{self.base_url}/cgi-bin/starfinder/0"
f"?path={self.db_path}&id=FASTLINK&pass=&search={search_param}"
f"&format=WEBKURZFL"
)
def _parse_records(self, html: str) -> list[Drucksache]:
results: list[Drucksache] = []
for record_html in self._RE_RECORD.findall(html):
m_link = self._RE_DRUCKSACHE_LINK.search(record_html)
if not m_link:
continue
pdf_url, drucksache = m_link.group(1), m_link.group(2)
m_title = self._RE_TITLE.search(record_html)
title = re.sub(r"\s+", " ", m_title.group(1)).strip() if m_title else f"Drucksache {drucksache}"
urheber = ""
datum_iso = ""
m_meta = self._RE_URHEBER_DATUM.search(record_html)
if m_meta:
urheber = m_meta.group(1).strip()
datum_iso = self._datum_de_to_iso(m_meta.group(2))
results.append(Drucksache(
drucksache=drucksache,
title=title,
fraktionen=self._normalize_fraktion(urheber),
datum=datum_iso,
link=pdf_url,
bundesland=self.bundesland,
typ="Antrag",
))
return results
async def search(self, query: str, limit: int = 20) -> list[Drucksache]:
url = self._build_url()
async with httpx.AsyncClient(
timeout=60,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"},
) as client:
try:
resp = await client.get(url)
if resp.status_code != 200:
logger.error("%s search HTTP %s", self.bundesland, resp.status_code)
return []
# Force latin1 because the Starfinder server doesn't always
# advertise the encoding correctly.
html = resp.content.decode("latin-1", errors="replace")
results = self._parse_records(html)
except Exception:
logger.exception("%s search error", self.bundesland)
return []
# Client-side title + Urheber filter (siehe #18)
if query:
terms = [t.lower() for t in query.split() if t]
results = [
d for d in results
if all(t in f"{d.title} {' '.join(d.fraktionen)}".lower() for t in terms)
]
return results[:limit]
async def get_document(self, drucksache: str) -> Optional[Drucksache]:
"""Look up a single Drucksache by ID.
SH responses are pre-sorted newest-first; we re-fetch up to 200
records and scan for the exact match. The Starfinder server
doesn't expose a number-only filter that we know of.
"""
results = await self.search(query="", limit=200)
for doc in results:
if doc.drucksache == drucksache:
return doc
return None
async def download_text(self, drucksache: str) -> Optional[str]:
import fitz # PyMuPDF
doc = await self.get_document(drucksache)
if not doc or not doc.link:
return None
async with httpx.AsyncClient(
timeout=60,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"},
) as client:
try:
resp = await client.get(doc.link)
if resp.status_code != 200:
return None
pdf = fitz.open(stream=resp.content, filetype="pdf")
text = ""
for page in pdf:
text += page.get_text()
pdf.close()
return text
except Exception:
logger.exception("%s PDF download error for %s", self.bundesland, drucksache)
return None
class BayernAdapter(ParlamentAdapter):
"""Adapter for Bayerischer Landtag."""
bundesland = "BY"
name = "Bayerischer Landtag"
base_url = "https://www.bayern.landtag.de"
async def search(self, query: str, limit: int = 20) -> list[Drucksache]:
# TODO: Implement Bayern search
return []
async def get_document(self, drucksache: str) -> Optional[Drucksache]:
# TODO: Implement
return None
async def download_text(self, drucksache: str) -> Optional[str]:
return None
class PARLISAdapter(ParlamentAdapter):
"""Adapter for Baden-Württemberg's PARLIS — eUI/portala-Variante mit
polling und JSON-in-HTML-Comment-Records.
PARLIS auf ``parlis.landtag-bw.de`` läuft technisch auf demselben
eUI-Backend wie LSA-PADOKA und BE-PARDOK, aber mit drei wichtigen
Unterschieden, die eine eigene Klasse statt einer PortalaAdapter-
Subklasse rechtfertigen:
1. **Body-Schema:** Statt der portala/LSA-typischen ``search.lines``
mit ``2/3/4/10/11/20.x/90.x``-Slots nutzt PARLIS ein viel kürzeres
``l1/l2/l3/l4`` Schema (siehe ``dokukratie/scrapers/portala.query.bw.json``).
``serverrecordname`` ist ``"vorgang"`` statt ``"sr_generic1"``,
``format`` ist ``"suchergebnis-vorgang-full"``, ``sort`` ist
``"SORT01/D SORT02/D SORT03"``. Es gibt kein ``parsed`` und kein
``json``-Tree — der Server akzeptiert das minimale Schema direkt.
2. **Async polling:** Im Gegensatz zu LSA/BE liefert die initiale
``Fulltext/Search``-Antwort nur eine ``search_id`` mit
``status: "running"``, KEINE ``report_id``. Erst eine zweite
``SearchAndDisplay``-Anfrage mit ``id: <search_id>`` (und ohne
``search``-Component) bekommt die fertige ``report_id`` zurück.
In meinen Live-Tests reichte ein einziger 2-Sekunden-Sleep
zwischen den Calls.
3. **Hit-Format:** Die ``report.tt.html``-Antwort liefert keine
Perl-Dump-Blöcke (LSA) und keine Bootstrap-Card-Divs (BE),
sondern **JSON-Records in HTML-Kommentaren**::
<!--{"WMV33":[{"main":"Schlagworte"}],
"EWBV22":[{"main":"Drucksache 17/10323"}],
"EWBD05":[{"main":"https://.../17_10323.pdf"}],
"EWBV23":[{"main":"Antrag Felix Herkens (GRÜNE) u. a. 16.03.2026"}],
...}-->
Der Parser zieht die Comments raw raus und mappt die WMV/EWBV-
Felder auf das ``Drucksache``-Dataclass.
Reverse-Engineering-Quelle: ``dokukratie/scrapers/portala.query.bw.json``
+ Live-HAR gegen ``parlis.landtag-bw.de`` (Issue #29).
"""
# Reverse-engineered field map for the JSON records that come embedded
# in HTML comments inside report.tt.html responses.
#
# Records look like ``<!--{"WMV33":[...],...}-->`` and may contain
# nested ``<i>...</i>`` highlight tags inside the JSON values.
# Non-greedy match against the literal closing ``}-->`` because that
# delimiter does not appear inside the JSON payload itself.
_RE_RECORD = re.compile(r"<!--(\{.*?\})-->", re.DOTALL)
_RE_DRUCKSACHE = re.compile(r"Drucksache\s+(\d+/\d+)")
_RE_DATUM = re.compile(r"(\d{1,2}\.\d{1,2}\.\d{4})")
def __init__(
self,
*,
bundesland: str,
name: str,
base_url: str,
wahlperiode: int,
prefix: str = "/parlis",
document_typ: str = "Antrag",
date_window_days: int = 730,
poll_attempts: int = 15,
poll_interval_seconds: float = 2.0,
) -> None:
"""Configure a PARLIS adapter for one specific parliament instance.
Args:
bundesland: state code, e.g. ``"BW"``.
name: human-readable label.
base_url: ``https://parlis.landtag-bw.de`` (no trailing slash).
wahlperiode: legislative period — feeds into ``lines.l1``.
prefix: app prefix where PARLIS lives. ``/parlis`` for BW.
document_typ: feeds into ``lines.l4``. The server interprets
this as a German document type label like ``"Antrag"``.
date_window_days: look-back window for the search range,
quick-win against title-only filtering — same approach
as the PortalaAdapter for LSA/BE.
poll_attempts: how many times to poll for ``report_id`` before
giving up. ~15 × 2s = 30s upper bound.
poll_interval_seconds: sleep between poll attempts.
"""
self.bundesland = bundesland
self.name = name
self.base_url = base_url.rstrip("/")
self.prefix = "/" + prefix.strip("/")
self.wahlperiode = wahlperiode
self.document_typ = document_typ
self.date_window_days = date_window_days
self.poll_attempts = poll_attempts
self.poll_interval_seconds = poll_interval_seconds
@staticmethod
def _datum_de_to_iso(datum_de: str) -> str:
"""DD.MM.YYYY → YYYY-MM-DD; '' for empty input."""
if not datum_de:
return ""
try:
d, m, y = datum_de.split(".")
return f"{y}-{m.zfill(2)}-{d.zfill(2)}"
except ValueError:
return ""
def _normalize_fraktion(self, text: str) -> list[str]:
"""Thin shim — siehe ``app.parteien.extract_fraktionen``. #55.
PARLIS packt den Originator in ``EWBV23`` wie
``"Antrag Felix Herkens (GRÜNE), Saskia Frank (GRÜNE)..."``.
"""
from .parteien import extract_fraktionen
return extract_fraktionen(text, bundesland=self.bundesland)
def _build_initial_body(self, start_date: str, end_date: str) -> dict:
"""Build the first ``SearchAndDisplay`` body with the search component.
The schema follows ``dokukratie/scrapers/portala.query.bw.json``
verbatim — only the placeholder values are substituted.
"""
return {
"action": "SearchAndDisplay",
"report": {
"rhl": "main",
"rhlmode": "add",
"format": "suchergebnis-vorgang-full",
"mime": "html",
"sort": "SORT01/D SORT02/D SORT03",
},
"search": {
"lines": {
"l1": str(self.wahlperiode),
"l2": start_date,
"l3": end_date,
"l4": self.document_typ,
},
"serverrecordname": "vorgang",
},
"sources": ["Star"],
}
def _build_poll_body(self, search_id: str) -> dict:
"""Build the polling body — same action, but with the search_id
instead of a fresh search component."""
return {
"action": "SearchAndDisplay",
"report": {
"rhl": "main",
"rhlmode": "add",
"format": "suchergebnis-vorgang-full",
"mime": "html",
"sort": "SORT01/D SORT02/D SORT03",
},
"id": search_id,
"sources": ["Star"],
}
def _hit_record_to_drucksache(self, record: dict) -> Optional[Drucksache]:
"""Map a single JSON-in-comment record to a ``Drucksache``.
PARLIS-record schema (reverse-engineered, all values are arrays
of ``{"main": ...}`` dicts):
- ``EWBV22``: "Drucksache 17/10323"
- ``EWBD05``: direct PDF URL
- ``EWBV23``: "Antrag <Urheber> <DD.MM.YYYY>" — single combined line
- ``WMV30``: short Urheber summary ("Felix Herkens (GRÜNE) u. a.")
- ``WMV33``: subject keywords (Schlagworte)
- ``EWBD01``: "Drucksache <X/Y> <DD.MM.YYYY>"
"""
def first(field: str) -> str:
block = record.get(field)
if isinstance(block, list) and block:
return (block[0].get("main") or "").strip()
return ""
ds_text = first("EWBV22") or first("EWBD01")
m_ds = self._RE_DRUCKSACHE.search(ds_text)
if not m_ds:
return None
drucksache = m_ds.group(1)
# The "title" we want is the Schlagworte/topic, not the
# Drucksachen-Header. PARLIS keeps the human-readable subject
# in WMV33 (Schlagworte joined by semicolons) — that's the
# closest equivalent to "title" the LSA/BE adapters expose.
# Fallback to the EWBV23 line if WMV33 is empty.
schlagworte = first("WMV33")
# Strip embedded <i>...</i> highlight tags
schlagworte_clean = re.sub(r"</?i>", "", schlagworte).strip()
title = schlagworte_clean or first("EWBV23") or f"Drucksache {drucksache}"
# Date + Urheber out of EWBV23 ("Antrag <Urheber> <DD.MM.YYYY>")
ewbv23 = first("EWBV23")
m_dat = self._RE_DATUM.search(ewbv23)
datum_iso = self._datum_de_to_iso(m_dat.group(1) if m_dat else "")
urheber_short = first("WMV30")
fraktionen = self._normalize_fraktion(urheber_short or ewbv23)
pdf_url = first("EWBD05")
return Drucksache(
drucksache=drucksache,
title=title,
fraktionen=fraktionen,
datum=datum_iso,
link=pdf_url,
bundesland=self.bundesland,
typ=self.document_typ,
)
async def _initial_search_and_poll(
self, client: httpx.AsyncClient, start_date: str, end_date: str,
) -> Optional[str]:
"""Run the initial search + poll until ``report_id`` arrives."""
import asyncio
browse_html = f"{self.base_url}{self.prefix}/browse.tt.html"
browse_json = f"{self.base_url}{self.prefix}/browse.tt.json"
# Step 1: warm cookies
await client.get(browse_html)
# Step 2: initial search
try:
resp = await client.post(
browse_json,
json=self._build_initial_body(start_date, end_date),
headers={"Referer": browse_html},
)
except Exception:
logger.exception("%s initial search request error", self.bundesland)
return None
if resp.status_code != 200:
logger.error("%s initial search HTTP %s", self.bundesland, resp.status_code)
return None
data = resp.json()
if data.get("report_id"):
return data["report_id"]
search_id = data.get("search_id")
if not search_id:
logger.error("%s no search_id in initial response: %s", self.bundesland, data)
return None
# Step 3: poll until report_id appears or we run out of attempts
for _ in range(self.poll_attempts):
await asyncio.sleep(self.poll_interval_seconds)
try:
resp = await client.post(
browse_json,
json=self._build_poll_body(search_id),
headers={"Referer": browse_html},
)
except Exception:
logger.exception("%s poll request error", self.bundesland)
return None
if resp.status_code != 200:
logger.error("%s poll HTTP %s", self.bundesland, resp.status_code)
return None
data = resp.json()
if data.get("report_id"):
return data["report_id"]
star = data.get("sources", {}).get("Star", {})
if star.get("status") == "stopped" and not data.get("report_id"):
# Search finished but no report — empty result
return None
logger.warning("%s gave up polling after %d attempts", self.bundesland, self.poll_attempts)
return None
def _parse_report_html(self, html: str) -> list[Drucksache]:
"""Extract Drucksachen from a report.tt.html response.
Records are JSON objects embedded in HTML comments. We pull each
comment block via regex, parse it as JSON, and map the WMV/EWBV
fields to a Drucksache.
"""
results: list[Drucksache] = []
for m in self._RE_RECORD.finditer(html):
json_text = m.group(1)
try:
record = json.loads(json_text)
except json.JSONDecodeError:
continue
doc = self._hit_record_to_drucksache(record)
if doc:
results.append(doc)
return results
async def search(self, query: str, limit: int = 20) -> list[Drucksache]:
"""Search recent BW Anträge with optional client-side title filter.
Server-side full-text is not used (#18 — einheitliches
Verhalten ohne Volltext bis alle Adapter es können). The
client filter looks at title (Schlagworte) + Urheber.
"""
from datetime import date, timedelta
end = date.today()
start = end - timedelta(days=self.date_window_days)
async with httpx.AsyncClient(
timeout=60,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"},
) as client:
try:
report_id = await self._initial_search_and_poll(
client, start.isoformat(), end.isoformat(),
)
if not report_id:
return []
# Pull a generous chunk so the client-side filter has
# enough material to work with.
chunksize = max(limit * 10, 200) if query else max(limit * 2, 50)
report_url = (
f"{self.base_url}{self.prefix}/report.tt.html"
f"?report_id={report_id}&start=0&chunksize={chunksize}"
)
resp = await client.get(
report_url,
headers={"Referer": f"{self.base_url}{self.prefix}/browse.tt.html"},
)
if resp.status_code != 200:
logger.error("%s report HTTP %s", self.bundesland, resp.status_code)
return []
results = self._parse_report_html(resp.text)
except Exception:
logger.exception("%s search error", self.bundesland)
return []
# Client-side filter
if query:
terms = [t.lower() for t in query.split() if t]
results = [
d for d in results
if all(t in f"{d.title} {' '.join(d.fraktionen)}".lower() for t in terms)
]
return results[:limit]
async def get_document(self, drucksache: str) -> Optional[Drucksache]:
"""Look up a single Drucksache by ID via a broad browse."""
results = await self.search(query="", limit=200)
for doc in results:
if doc.drucksache == drucksache:
return doc
return None
async def download_text(self, drucksache: str) -> Optional[str]:
"""Download the PDF for a Drucksache and extract its text."""
import fitz # PyMuPDF
doc = await self.get_document(drucksache)
if not doc or not doc.link:
return None
async with httpx.AsyncClient(
timeout=60,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 GWOE-Antragspruefer"},
) as client:
try:
resp = await client.get(doc.link)
if resp.status_code != 200:
logger.error(
"%s PDF HTTP %s for %s (%s)",
self.bundesland, resp.status_code, drucksache, doc.link,
)
return None
pdf = fitz.open(stream=resp.content, filetype="pdf")
text = ""
for page in pdf:
text += page.get_text()
pdf.close()
return text
except Exception:
logger.exception("%s PDF download error for %s", self.bundesland, drucksache)
return None
# Registry of adapters
ADAPTERS = {
"NRW": NRWAdapter(),
"LSA": PortalaAdapter(
bundesland="LSA",
name="Landtag von Sachsen-Anhalt (PADOKA)",
base_url="https://padoka.landtag.sachsen-anhalt.de",
db_id="lsa.lissh",
wahlperiode=8,
portala_path="/portal",
document_type="Antrag",
pdf_url_prefix="/files/",
),
"BE": PortalaAdapter(
bundesland="BE",
name="Abgeordnetenhaus von Berlin (PARDOK)",
base_url="https://pardok.parlament-berlin.de",
db_id="lah.lissh",
wahlperiode=19,
portala_path="/portala",
# Berlin's ETYPF index uses different value strings — drop the
# document_type subtree, fall back to client-side title filter.
document_type=None,
# Quick-win for #13: pulled the date window from the original
# 180-day MVP up to 730 days so client-side title-filter searches
# ("Schule" etc.) reach back across more of the WP19 corpus until
# the eUI fulltext-sf is reverse-engineered. The chunksize bump
# in PortalaAdapter.search() means the per-request payload stays
# bounded.
date_window_days=730,
pdf_url_prefix="/files/",
),
"MV": ParLDokAdapter(
bundesland="MV",
name="Landtag Mecklenburg-Vorpommern (ParlDok)",
base_url="https://www.dokumentation.landtag-mv.de",
wahlperiode=8,
prefix="/parldok",
document_typ="Antrag",
),
"HH": ParLDokAdapter(
bundesland="HH",
name="Hamburgische Bürgerschaft (ParlDok)",
base_url="https://www.buergerschaft-hh.de",
wahlperiode=23,
prefix="/parldok",
document_typ="Antrag",
),
"TH": ParLDokAdapter(
bundesland="TH",
name="Thüringer Landtag (ParlDok)",
base_url="https://parldok.thueringer-landtag.de",
wahlperiode=8,
prefix="/parldok",
# TH packs Anträge under composite type strings like
# "Antrag gemäß § 79 GO" with kind="Vorlage", not the
# MV-style kind="Drucksache"/type="Antrag". Substring-match
# on "Antrag" plus widened kind list catches them all.
document_typ="Antrag",
document_typ_substring=True,
kinds=["Drucksache", "Vorlage"],
),
"SH": StarFinderCGIAdapter(
bundesland="SH",
name="Schleswig-Holsteinischer Landtag (LIS-SH)",
base_url="http://lissh.lvn.parlanet.de",
wahlperiode=20,
db_path="lisshfl.txt",
document_typ_code="antrag",
),
"BB": PortalaAdapter(
bundesland="BB",
name="Landtag Brandenburg (parladoku)",
base_url="https://www.parlamentsdokumentation.brandenburg.de",
db_id="lbb.lissh",
wahlperiode=8,
portala_path="/portal",
document_type="Antrag",
# BB packs the date BEFORE the Drucksachen-Nummer in the h6
# line and uses the BE-style efxRecordRepeater HTML cards;
# the auto-detect picks the card path automatically.
),
"RP": PortalaAdapter(
bundesland="RP",
name="Landtag Rheinland-Pfalz (OPAL)",
base_url="https://opal.rlp.de",
db_id="rlp.lissh",
wahlperiode=18,
portala_path="/portal",
document_type="Antrag",
),
"BY": BayernAdapter(),
"BW": PARLISAdapter(
bundesland="BW",
name="Landtag von Baden-Württemberg (PARLIS)",
base_url="https://parlis.landtag-bw.de",
wahlperiode=17,
prefix="/parlis",
document_typ="Antrag",
),
}
def get_adapter(bundesland: str) -> Optional[ParlamentAdapter]:
"""Get adapter for a bundesland."""
return ADAPTERS.get(bundesland)
async def search_all(query: str, bundesland: str = "NRW", limit: int = 20) -> list[Drucksache]:
"""Search parliament documents in a specific state."""
adapter = get_adapter(bundesland)
if not adapter:
return []
return await adapter.search(query, limit)