"""Parliament search adapters for different German states.""" import httpx import re from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Optional from bs4 import BeautifulSoup @dataclass class Drucksache: """A parliamentary document.""" drucksache: str # e.g. "18/8125" title: str fraktionen: list[str] datum: str # ISO date link: str # PDF URL bundesland: str typ: str = "Antrag" # Antrag, Anfrage, Beschlussempfehlung, etc. class ParlamentAdapter(ABC): """Base adapter for searching parliament documents.""" bundesland: str name: str @abstractmethod async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search for documents matching query.""" pass @abstractmethod async def get_document(self, drucksache: str) -> Optional[Drucksache]: """Get a specific document by ID.""" pass @abstractmethod async def download_text(self, drucksache: str) -> Optional[str]: """Download and extract text from a document.""" pass class NRWAdapter(ParlamentAdapter): """Adapter for NRW Landtag (opal.landtag.nrw.de).""" bundesland = "NRW" name = "Landtag Nordrhein-Westfalen" base_url = "https://opal.landtag.nrw.de" search_url = "https://opal.landtag.nrw.de/home/dokumente/dokumentensuche/parlamentsdokumente/aktuelle-dokumente.html" def _parse_query(self, query: str) -> tuple[str, list[str], bool]: """ Parse search query for AND logic and exact phrases. Returns: (search_term_for_api, filter_terms, is_exact) Examples: - 'Klimaschutz Energie' -> ('Klimaschutz', ['klimaschutz', 'energie'], False) - '"Grüner Stahl"' -> ('Grüner Stahl', ['grüner stahl'], True) - 'Klimaschutz "erneuerbare Energie"' -> ('Klimaschutz', ['klimaschutz', 'erneuerbare energie'], False) """ query = query.strip() # Check for exact phrase (entire query in quotes) if query.startswith('"') and query.endswith('"') and query.count('"') == 2: exact = query[1:-1].strip() return (exact, [exact.lower()], True) # Extract quoted phrases and regular terms import shlex try: parts = shlex.split(query) except ValueError: # Fallback for unbalanced quotes parts = query.split() if not parts: return (query, [query.lower()], False) # Use first term for API search, all terms for filtering filter_terms = [p.lower() for p in parts] return (parts[0], filter_terms, False) def _matches_all_terms(self, doc: 'Drucksache', terms: list[str], is_exact: bool) -> bool: """Check if document matches all search terms (AND logic).""" searchable = f"{doc.title} {doc.drucksache} {' '.join(doc.fraktionen)} {doc.typ}".lower() if is_exact: # Exact phrase must appear return terms[0] in searchable else: # All terms must appear (AND) return all(term in searchable for term in terms) async def search(self, query: str, limit: int = 20) -> list[Drucksache]: """Search NRW Landtag documents via OPAL portal.""" results = [] # Parse query for AND logic api_query, filter_terms, is_exact = self._parse_query(query) async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: try: # First, get the page to establish session initial = await client.get(self.search_url) if initial.status_code != 200: print(f"NRW search initial request failed: {initial.status_code}") return [] # Parse for webflow token from pagination links soup = BeautifulSoup(initial.text, 'html.parser') # Find a pagination link to extract the webflow token pagination_link = soup.select_one('a[href*="webflowexecution"]') webflow_token = "" webflow_execution = "" if pagination_link: href = pagination_link.get('href', '') # Extract webflowToken and webflowexecution from URL token_match = re.search(r'webflowToken=([^&]*)', href) exec_match = re.search(r'(webflowexecution[^=]+)=([^&]+)', href) if token_match: webflow_token = token_match.group(1) if exec_match: webflow_execution = f"{exec_match.group(1)}={exec_match.group(2)}" # Now perform the search with POST # Find the form action URL with webflow token form = soup.select_one('form#docSearchByItem') form_action = self.search_url if form and form.get('action'): action = form.get('action') if action.startswith('/'): form_action = f"{self.base_url}{action}" elif action.startswith('http'): form_action = action else: form_action = f"{self.search_url}?{action}" # Build form data for "Einfache Suche" (searchByItem form) form_data = { '_eventId_sendform': '1', 'dokNum': api_query, # This is the text search field 'formId': 'searchByItem', 'dokTyp': '', # All types 'wp': '18', # Wahlperiode 18 } # POST request with form data to the form action URL search_resp = await client.post( form_action, data=form_data, cookies=initial.cookies, headers={'Content-Type': 'application/x-www-form-urlencoded'} ) if search_resp.status_code != 200: print(f"NRW search request failed: {search_resp.status_code}") return [] # Parse results soup = BeautifulSoup(search_resp.text, 'html.parser') # Find all document result items (li elements containing articles) items = soup.select('li:has(article)') for item in items[:limit]: try: # Extract drucksache number from first link num_link = item.select_one('a[href*="MMD"]') if not num_link: continue href = num_link.get('href', '') # Extract number: MMD18-12345.pdf -> 18/12345 match = re.search(r'MMD(\d+)-(\d+)\.pdf', href) if not match: continue legislatur, nummer = match.groups() drucksache = f"{legislatur}/{nummer}" pdf_url = f"https://www.landtag.nrw.de{href}" if href.startswith('/') else href # Extract title from the title link (class e-document-result-item__title) title_elem = item.select_one('a.e-document-result-item__title') if title_elem: # Get text content, clean it up title = title_elem.get_text(strip=True) # Remove SVG icon text and clean title = re.sub(r'\s* Optional[Drucksache]: """Get document metadata by drucksache ID (e.g. '18/8125').""" # Parse legislatur and number match = re.match(r"(\d+)/(\d+)", drucksache) if not match: return None legislatur, nummer = match.groups() pdf_url = f"https://www.landtag.nrw.de/portal/WWW/dokumentenarchiv/Dokument/MMD{legislatur}-{nummer}.pdf" # Try to fetch and extract basic info async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: try: resp = await client.head(pdf_url) if resp.status_code == 200: return Drucksache( drucksache=drucksache, title=f"Drucksache {drucksache}", fraktionen=[], datum="", link=pdf_url, bundesland="NRW", ) except: pass return None async def download_text(self, drucksache: str) -> Optional[str]: """Download PDF and extract text.""" import fitz # PyMuPDF doc = await self.get_document(drucksache) if not doc: return None async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client: try: resp = await client.get(doc.link) if resp.status_code != 200: return None # Extract text with PyMuPDF pdf = fitz.open(stream=resp.content, filetype="pdf") text = "" for page in pdf: text += page.get_text() pdf.close() return text except Exception as e: print(f"Error downloading {drucksache}: {e}") return None class BayernAdapter(ParlamentAdapter): """Adapter for Bayerischer Landtag.""" bundesland = "BY" name = "Bayerischer Landtag" base_url = "https://www.bayern.landtag.de" async def search(self, query: str, limit: int = 20) -> list[Drucksache]: # TODO: Implement Bayern search return [] async def get_document(self, drucksache: str) -> Optional[Drucksache]: # TODO: Implement return None async def download_text(self, drucksache: str) -> Optional[str]: return None class BWAdapter(ParlamentAdapter): """Adapter for Baden-Württemberg Landtag.""" bundesland = "BW" name = "Landtag Baden-Württemberg" base_url = "https://www.landtag-bw.de" async def search(self, query: str, limit: int = 20) -> list[Drucksache]: # TODO: Implement BW search return [] async def get_document(self, drucksache: str) -> Optional[Drucksache]: return None async def download_text(self, drucksache: str) -> Optional[str]: return None # Registry of adapters ADAPTERS = { "NRW": NRWAdapter(), "BY": BayernAdapter(), "BW": BWAdapter(), } def get_adapter(bundesland: str) -> Optional[ParlamentAdapter]: """Get adapter for a bundesland.""" return ADAPTERS.get(bundesland) async def search_all(query: str, bundesland: str = "NRW", limit: int = 20) -> list[Drucksache]: """Search parliament documents in a specific state.""" adapter = get_adapter(bundesland) if not adapter: return [] return await adapter.search(query, limit)