#!/usr/bin/env python3 """ GWÖ-Wahlprüfsteine Scraper Extrahiert Antworten aus der ECOnGOOD-Webseite und speichert sie in SQLite. """ import re import sqlite3 import json from pathlib import Path from dataclasses import dataclass from bs4 import BeautifulSoup import requests # Partei-Normalisierung PARTEI_MAPPING = { # Grüne r'bündnis\s*90\s*/?\s*die\s*grünen?': 'Grüne', r'^grüne$': 'Grüne', # Freie Wähler r'freie\s*wähler': 'Freie Wähler', r'^fw$': 'Freie Wähler', r'^upw\s*/\s*freie\s*wähler$': 'Freie Wähler', # CSU r'^csu$': 'CSU', r'csu\s*(und|&)': 'CSU', r'pro\s+.*\s+und\s+csu': 'CSU', # SPD r'^spd$': 'SPD', # FDP r'^fdp$': 'FDP', # ÖDP r'^ödp': 'ÖDP', # Linke r'die\s*linke': 'Linke', # AfD r'^afd$': 'AfD', # Bayernpartei r'bayernpartei': 'Bayernpartei', } @dataclass class Kandidat: vorname: str nachname: str plz: str kommune: str landkreis: str partei_raw: str pdf_url: str antworten: dict # frage_nr -> (ja_nein, erläuterung) def normalize_partei(raw: str) -> tuple[str, bool]: """ Normalisiert Parteinamen. Returns: (normalisierte_partei, ist_wählergemeinschaft) """ raw_lower = raw.lower().strip() for pattern, normalized in PARTEI_MAPPING.items(): if re.search(pattern, raw_lower, re.IGNORECASE): return normalized, False # Wählergemeinschaften erkennen wg_patterns = [ r'wähler', r'liste\s', r'bürger', r'umwelt', r'gemeinschaft', ] for pattern in wg_patterns: if re.search(pattern, raw_lower): return 'Wählergemeinschaft', True # Unbekannt → Wählergemeinschaft return 'Wählergemeinschaft', True def parse_html(html_content: str) -> list[Kandidat]: """Parst die HTML-Seite und extrahiert alle Kandidaten mit Antworten.""" soup = BeautifulSoup(html_content, 'html.parser') kandidaten = [] # Finde die Haupttabelle mit allen Antworten # Die Tabelle hat Spalten: PLZ, Kommune, Landkreis, Vorname, Nachname, Partei, PDF, dann 12 Spalten für 6 Fragen (Ja/Nein + Erläuterung) tables = soup.find_all('table') for table in tables: rows = table.find_all('tr') for row in rows[1:]: # Skip header cells = row.find_all('td') # Mindestens 19 Spalten erwartet (PLZ bis Frage 6 Erläuterung) if len(cells) < 19: continue try: # Extrahiere Basisdaten plz = cells[0].get_text(strip=True) kommune = cells[1].get_text(strip=True) landkreis = cells[2].get_text(strip=True) vorname = cells[3].get_text(strip=True) nachname = cells[4].get_text(strip=True) partei_raw = cells[5].get_text(strip=True) # PDF-Link pdf_link = cells[6].find('a') pdf_url = pdf_link['href'] if pdf_link else None # Antworten (6 Fragen × 2 Spalten = 12 Spalten ab Index 7) antworten = {} for i in range(6): ja_nein_idx = 7 + i * 2 erlaeuterung_idx = 8 + i * 2 if ja_nein_idx < len(cells) and erlaeuterung_idx < len(cells): ja_nein = cells[ja_nein_idx].get_text(strip=True) erlaeuterung = cells[erlaeuterung_idx].get_text(strip=True) # Normalisiere Ja/Nein if ja_nein.lower() in ['ja', 'yes']: ja_nein = 'Ja' elif ja_nein.lower() in ['nein', 'no']: ja_nein = 'Nein' else: ja_nein = None antworten[i + 1] = (ja_nein, erlaeuterung if erlaeuterung and erlaeuterung != '/' else None) if vorname and nachname and partei_raw: kandidaten.append(Kandidat( vorname=vorname, nachname=nachname, plz=plz, kommune=kommune, landkreis=landkreis, partei_raw=partei_raw, pdf_url=pdf_url, antworten=antworten )) except (IndexError, KeyError) as e: continue return kandidaten def fetch_and_parse(url: str) -> list[Kandidat]: """Lädt die Webseite und parst sie.""" response = requests.get(url, timeout=30) response.raise_for_status() return parse_html(response.text) def init_db(db_path: Path) -> sqlite3.Connection: """Initialisiert die Datenbank mit Schema.""" schema_path = Path(__file__).parent / 'schema.sql' conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row with open(schema_path) as f: conn.executescript(f.read()) conn.commit() return conn def save_to_db(conn: sqlite3.Connection, kandidaten: list[Kandidat]): """Speichert Kandidaten und Antworten in der Datenbank.""" cursor = conn.cursor() for k in kandidaten: partei_norm, ist_wg = normalize_partei(k.partei_raw) # Kandidat einfügen cursor.execute(""" INSERT OR REPLACE INTO kandidaten (vorname, nachname, plz, kommune, landkreis, partei_raw, partei_normalisiert, ist_waehlergemeinschaft, pdf_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (k.vorname, k.nachname, k.plz, k.kommune, k.landkreis, k.partei_raw, partei_norm, ist_wg, k.pdf_url)) kandidat_id = cursor.lastrowid # Antworten einfügen for frage_nr, (ja_nein, erlaeuterung) in k.antworten.items(): cursor.execute(""" INSERT OR REPLACE INTO antworten_raw (kandidat_id, frage_id, antwort_kurz, antwort_erlaeuterung) VALUES (?, ?, ?, ?) """, (kandidat_id, frage_nr, ja_nein, erlaeuterung)) conn.commit() return len(kandidaten) def main(): """Hauptfunktion.""" import argparse parser = argparse.ArgumentParser(description='GWÖ-Wahlprüfsteine Scraper') parser.add_argument('--url', default='https://germany.econgood.org/wahlpruefsteine-zu-den-bayerischen-kommunalwahlen-2026', help='URL der Wahlprüfsteine-Seite') parser.add_argument('--html', type=Path, help='Lokale HTML-Datei statt URL') parser.add_argument('--db', type=Path, default=Path(__file__).parent / 'wahlpruefsteine.db', help='Pfad zur SQLite-Datenbank') parser.add_argument('--verbose', '-v', action='store_true', help='Ausführliche Ausgabe') args = parser.parse_args() # HTML laden if args.html: print(f"Lade lokale Datei: {args.html}") with open(args.html) as f: kandidaten = parse_html(f.read()) else: print(f"Lade URL: {args.url}") kandidaten = fetch_and_parse(args.url) print(f"Gefunden: {len(kandidaten)} Kandidat:innen") if args.verbose: for k in kandidaten: partei_norm, _ = normalize_partei(k.partei_raw) print(f" - {k.vorname} {k.nachname} ({k.kommune}) → {partei_norm}") # In DB speichern conn = init_db(args.db) count = save_to_db(conn, kandidaten) conn.close() print(f"Gespeichert: {count} Kandidat:innen in {args.db}") # Statistik conn = sqlite3.connect(args.db) cursor = conn.cursor() print("\nPartei-Verteilung:") for row in cursor.execute(""" SELECT partei_normalisiert, COUNT(*) as n FROM kandidaten GROUP BY partei_normalisiert ORDER BY n DESC """): print(f" {row[0]}: {row[1]}") conn.close() if __name__ == '__main__': main()