"""Aggregations-Funktionen für die Auswertungen-Seite (#58). Liest direkt aus ``data/gwoe-antraege.db`` (assessments-Tabelle) und baut drei Sichten: 1. ``aggregate_matrix(filter_wp=None)`` — 2D-Matrix Bundesland × Partei mit (n, Ø-GWÖ-Score). Filterbar nach Wahlperiode. 2. ``aggregate_zeitreihe(bundesland, partei)`` — Score-Verlauf einer (BL, Partei)-Kombination über alle bekannten WPs. 3. ``export_long_format()`` — Long-Format-Tabelle für CSV-Export (deckt zusätzlich Issue #45 ab). Partei-Auflösung läuft strikt über ``app.parteien.normalize_partei`` — ohne den Mapper aus #55 würde z.B. BB-FW mit RP-FW in einen Topf gerührt. """ from __future__ import annotations import csv import io import json import sqlite3 from collections import defaultdict from pathlib import Path from typing import Optional from .config import settings from .parteien import normalize_partei from .wahlperioden import wahlperiode_for # ───────────────────────────────────────────────────────────────────────────── # Datenstrukturen # ───────────────────────────────────────────────────────────────────────────── def _load_assessments(db_path: Optional[Path] = None) -> list[dict]: """Lese alle Assessments aus der SQLite-DB. Kein Filter — die Aggregations-Funktionen filtern selbst. Kein async, weil die Sicht synchron berechnet werden kann.""" path = db_path or settings.db_path if not Path(path).exists(): return [] conn = sqlite3.connect(str(path)) try: conn.row_factory = sqlite3.Row rows = conn.execute( """ SELECT drucksache, bundesland, datum, fraktionen, gwoe_score FROM assessments WHERE gwoe_score IS NOT NULL """ ).fetchall() finally: conn.close() out: list[dict] = [] for r in rows: try: fraktionen = json.loads(r["fraktionen"]) if r["fraktionen"] else [] except (json.JSONDecodeError, TypeError): fraktionen = [] out.append({ "drucksache": r["drucksache"], "bundesland": r["bundesland"], "datum": r["datum"] or "", "fraktionen": fraktionen, "gwoe_score": r["gwoe_score"], }) return out # ───────────────────────────────────────────────────────────────────────────── # 1. Matrix Bundesland × Partei # ───────────────────────────────────────────────────────────────────────────── def aggregate_matrix( filter_wp: Optional[str] = None, filter_bl: Optional[str] = None, db_path: Optional[Path] = None, ) -> dict: """Aggregate assessments to a 2D matrix. Returns: ``{ "bundeslaender": [...], "parteien": [...], "cells": { "": {"": {"n": int, "avg": float}} }, "filter_wp": | None, "filter_bl": | None, "total": int, }`` ``filter_wp`` ist eine ``"-WP"``-Kennung wie ``"NRW-WP18"``; nur Assessments dieser Wahlperiode fließen ein. ``None`` = keine WP-Einschränkung (alle WPs zusammen). ``filter_bl`` schränkt auf ein Bundesland ein (z.B. ``"NRW"``); ``None`` = alle Bundesländer. """ rows = _load_assessments(db_path) bundeslaender: set[str] = set() parteien: set[str] = set() sums: defaultdict[tuple[str, str], float] = defaultdict(float) counts: defaultdict[tuple[str, str], int] = defaultdict(int) total = 0 for row in rows: bl = row["bundesland"] if not bl: continue if filter_bl is not None and bl != filter_bl: continue if filter_wp is not None: wp = wahlperiode_for(row["datum"], bl) if wp != filter_wp: continue bundeslaender.add(bl) for raw_partei in row["fraktionen"]: canonical = normalize_partei(raw_partei, bundesland=bl) or raw_partei parteien.add(canonical) key = (bl, canonical) sums[key] += row["gwoe_score"] counts[key] += 1 total += 1 cells: dict[str, dict[str, dict]] = {} for (bl, partei), s in sums.items(): n = counts[(bl, partei)] cells.setdefault(bl, {})[partei] = { "n": n, "avg": round(s / n, 2) if n else None, } return { "bundeslaender": sorted(bundeslaender), "parteien": sorted(parteien), "cells": cells, "filter_wp": filter_wp, "filter_bl": filter_bl, "total": total, } # ───────────────────────────────────────────────────────────────────────────── # 1b. Hilfsfunktion: Liste aller bekannten Wahlperioden # ───────────────────────────────────────────────────────────────────────────── def get_wahlperioden(db_path: Optional[Path] = None) -> list[str]: """Gibt alle bekannten Wahlperioden aus den vorhandenen Assessments zurück, aufsteigend sortiert.""" rows = _load_assessments(db_path) wps: set[str] = set() for r in rows: wp = wahlperiode_for(r["drucksache"], r["bundesland"]) if wp: wps.add(wp) return sorted(wps) # ───────────────────────────────────────────────────────────────────────────── # 2. Zeitreihe pro (BL, Partei) über alle Wahlperioden # ───────────────────────────────────────────────────────────────────────────── def aggregate_zeitreihe( bundesland: str, partei: str, db_path: Optional[Path] = None, ) -> dict: """Score-Verlauf einer (BL, Partei)-Kombination über alle WPs. Returns: ``{ "bundesland": str, "partei": str, "wahlperioden": [ {"wp": "-WP", "n": int, "avg": float}, ... ] }`` """ rows = _load_assessments(db_path) sums: defaultdict[str, float] = defaultdict(float) counts: defaultdict[str, int] = defaultdict(int) for row in rows: if row["bundesland"] != bundesland: continue canonical_partei_in_row = { normalize_partei(p, bundesland=bundesland) or p for p in row["fraktionen"] } if partei not in canonical_partei_in_row: continue wp = wahlperiode_for(row["datum"], bundesland) if wp is None: continue sums[wp] += row["gwoe_score"] counts[wp] += 1 wps = sorted(sums.keys()) return { "bundesland": bundesland, "partei": partei, "wahlperioden": [ {"wp": wp, "n": counts[wp], "avg": round(sums[wp] / counts[wp], 2)} for wp in wps ], } # ───────────────────────────────────────────────────────────────────────────── # 3. Long-Format-Export für CSV (deckt #45 mit ab) # ───────────────────────────────────────────────────────────────────────────── def export_long_format(db_path: Optional[Path] = None) -> str: """Long-Format-CSV-Export aller Assessments für externe Auswertung. Spalten: ``drucksache,bundesland,wahlperiode,datum,partei,gwoe_score``. Eine Zeile pro (drucksache, partei) — wenn ein Antrag mehrere Fraktionen hat (Koalitionsanträge), erscheinen entsprechend mehrere Zeilen mit identischer Drucksache. """ rows = _load_assessments(db_path) buf = io.StringIO() writer = csv.writer(buf, dialect="excel") writer.writerow(["drucksache", "bundesland", "wahlperiode", "datum", "partei", "gwoe_score"]) for r in rows: bl = r["bundesland"] or "" wp = wahlperiode_for(r["datum"], bl) if bl else "" for raw_partei in r["fraktionen"]: canonical = normalize_partei(raw_partei, bundesland=bl) or raw_partei writer.writerow([ r["drucksache"], bl, wp or "", r["datum"], canonical, f"{r['gwoe_score']:.2f}", ]) return buf.getvalue() # ───────────────────────────────────────────────────────────────────────────── # 4. Stimmverhalten × Gemeinwohl-Orientierung (#106 + #145 Folge) # # JOIN ueber assessments.gwoe_score × plenum_vote_results.fraktionen_ja|_nein| # _enthaltung. Vier Sichten: # - aggregate_stimm_index: pro Fraktion JA-Mean minus NEIN-Mean von gwoe_score # - aggregate_heuchelei: pro Fraktion % der Antraege mit # wahlprogramm_score>=7 wo Vote=NEIN # - aggregate_stimm_index_pro_wert: stimm_index pro (Fraktion, GWOe-Wert) # aus gwoe_matrix-Spalten # - aggregate_stimm_index_cross_bl: stimm_index pro (Fraktion, BL) # # Sparse-Data-Realitaet: 35 Assessments × 7000 Votes → meiste Anträge mit Vote # haben kein Assessment. min_n-Cutoff filtert Fraktionen mit zu wenig Daten. # ───────────────────────────────────────────────────────────────────────────── # Spalten der GWOe-Matrix: {field-suffix → Wert-Name}. Die Field-IDs in der DB # sind ``A1..E5`` mit Reihe = Beruehrungsgruppe (A-E), Spalte = Wert (1-5). GWOE_WERTE = { "1": "Menschenwürde", "2": "Solidarität", "3": "Ökologische Nachhaltigkeit", "4": "Soziale Gerechtigkeit", "5": "Transparenz & Demokratie", } def _load_assessments_with_votes( filter_bl: Optional[str] = None, filter_wp: Optional[str] = None, db_path: Optional[Path] = None, ) -> list[dict]: """JOIN assessments × plenum_vote_results auf (bundesland, drucksache). Liefert pro Match-Zeile alle relevanten Felder fuer die Stimmverhalten- Aggregationen — nur Assessments mit gwoe_score und Vote-Result. Mehrfach-Votes pro Drucksache (Compound-PK ueber quelle_protokoll) erzeugen entsprechend mehrere Zeilen. """ path = db_path or settings.db_path if not Path(path).exists(): return [] conn = sqlite3.connect(str(path)) try: conn.row_factory = sqlite3.Row rows = conn.execute( """ SELECT a.drucksache, a.bundesland, a.datum, a.fraktionen, a.gwoe_score, a.gwoe_matrix, a.gwoe_schwerpunkt, a.wahlprogramm_scores, p.fraktionen_ja, p.fraktionen_nein, p.fraktionen_enthaltung, p.quelle_protokoll FROM assessments a INNER JOIN plenum_vote_results p ON a.bundesland = p.bundesland AND a.drucksache = p.drucksache WHERE a.gwoe_score IS NOT NULL """ ).fetchall() finally: conn.close() out: list[dict] = [] for r in rows: bl = r["bundesland"] or "" if filter_bl is not None and bl != filter_bl: continue if filter_wp is not None: wp = wahlperiode_for(r["datum"], bl) if wp != filter_wp: continue def _parse_json_list(raw): try: return json.loads(raw) if raw else [] except (json.JSONDecodeError, TypeError): return [] def _norm_set(raw_list): """Normalisiere eine Fraktionsliste auf kanonische Namen.""" return { normalize_partei(p, bundesland=bl) or p for p in raw_list if p } antragsteller = _norm_set(_parse_json_list(r["fraktionen"])) ja = _norm_set(_parse_json_list(r["fraktionen_ja"])) nein = _norm_set(_parse_json_list(r["fraktionen_nein"])) enth = _norm_set(_parse_json_list(r["fraktionen_enthaltung"])) out.append({ "drucksache": r["drucksache"], "bundesland": bl, "datum": r["datum"] or "", "gwoe_score": r["gwoe_score"], "gwoe_matrix": _parse_json_list(r["gwoe_matrix"]), "gwoe_schwerpunkt": _parse_json_list(r["gwoe_schwerpunkt"]), "wahlprogramm_scores": _parse_json_list(r["wahlprogramm_scores"]), "antragsteller": antragsteller, "ja": ja, "nein": nein, "enthaltung": enth, "quelle_protokoll": r["quelle_protokoll"], }) return out def _avg(values: list[float]) -> Optional[float]: return round(sum(values) / len(values), 2) if values else None def aggregate_stimm_index( filter_bl: Optional[str] = None, filter_wp: Optional[str] = None, exclude_antragsteller: bool = True, min_n: int = 5, db_path: Optional[Path] = None, ) -> dict: """Pro Fraktion: Ø-GWÖ-Score der JA-Antraege minus Ø-GWÖ-Score der NEIN-Antraege. Antragsteller-Bias optional rausgerechnet. ``stimm_index`` ist None wenn n_ja oder n_nein = 0; ``ausreichend`` ist True wenn n_ja >= min_n und n_nein >= min_n (Domain-Heuristik: Aussage erst belastbar bei beidseitigem Mindest-N). """ rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path) ja_scores: defaultdict[str, list[float]] = defaultdict(list) nein_scores: defaultdict[str, list[float]] = defaultdict(list) enth_scores: defaultdict[str, list[float]] = defaultdict(list) for row in rows: score = row["gwoe_score"] skip = row["antragsteller"] if exclude_antragsteller else set() for f in row["ja"] - skip: ja_scores[f].append(score) for f in row["nein"] - skip: nein_scores[f].append(score) for f in row["enthaltung"] - skip: enth_scores[f].append(score) parteien = sorted(set(ja_scores) | set(nein_scores) | set(enth_scores)) fraktionen_out = [] for p in parteien: n_ja = len(ja_scores[p]) n_nein = len(nein_scores[p]) n_enth = len(enth_scores[p]) avg_ja = _avg(ja_scores[p]) avg_nein = _avg(nein_scores[p]) idx = (round(avg_ja - avg_nein, 2) if avg_ja is not None and avg_nein is not None else None) fraktionen_out.append({ "partei": p, "n_ja": n_ja, "n_nein": n_nein, "n_enth": n_enth, "avg_gwoe_ja": avg_ja, "avg_gwoe_nein": avg_nein, "avg_gwoe_enth": _avg(enth_scores[p]), "stimm_index": idx, "ausreichend": n_ja >= min_n and n_nein >= min_n, }) fraktionen_out.sort( key=lambda f: (f["stimm_index"] if f["stimm_index"] is not None else -999), reverse=True, ) return { "fraktionen": fraktionen_out, "n_assessments_matched": len({r["drucksache"] for r in rows}), "n_votes_matched": len(rows), "filter": { "bundesland": filter_bl, "wahlperiode": filter_wp, "exclude_antragsteller": exclude_antragsteller, "min_n": min_n, }, } def aggregate_heuchelei( filter_bl: Optional[str] = None, filter_wp: Optional[str] = None, score_threshold: float = 7.0, min_n: int = 5, db_path: Optional[Path] = None, ) -> dict: """Pro Fraktion: Anteil der Antraege mit wahlprogramm_score >= 7 (Antrag passt zum Wahlprogramm), bei denen die Fraktion trotzdem NEIN gestimmt hat. Misst Inkonsistenz zwischen Wahlversprechen und Stimmverhalten. """ rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path) n_passt: defaultdict[str, int] = defaultdict(int) n_passt_nein: defaultdict[str, int] = defaultdict(int) n_passt_ja: defaultdict[str, int] = defaultdict(int) n_passt_enth: defaultdict[str, int] = defaultdict(int) for row in rows: wp_scores = row["wahlprogramm_scores"] or [] for entry in wp_scores: if not isinstance(entry, dict): continue raw_partei = entry.get("fraktion") or "" partei = normalize_partei(raw_partei, bundesland=row["bundesland"]) or raw_partei if not partei: continue wp_block = entry.get("wahlprogramm") or {} score = wp_block.get("score") if score is None or score < score_threshold: continue n_passt[partei] += 1 if partei in row["nein"]: n_passt_nein[partei] += 1 elif partei in row["ja"]: n_passt_ja[partei] += 1 elif partei in row["enthaltung"]: n_passt_enth[partei] += 1 fraktionen_out = [] for partei in sorted(n_passt): total = n_passt[partei] nein = n_passt_nein[partei] ja = n_passt_ja[partei] enth = n_passt_enth[partei] quote = round(nein / total, 3) if total else None fraktionen_out.append({ "partei": partei, "n_im_programm": total, "n_nein_trotz_programm": nein, "n_ja_passt": ja, "n_enth_passt": enth, "heuchelei_quote": quote, "ausreichend": total >= min_n, }) fraktionen_out.sort( key=lambda f: (f["heuchelei_quote"] or 0), reverse=True, ) return { "fraktionen": fraktionen_out, "n_assessments_matched": len({r["drucksache"] for r in rows}), "filter": { "bundesland": filter_bl, "wahlperiode": filter_wp, "score_threshold": score_threshold, "min_n": min_n, }, } def _wert_score_for_assessment(matrix: list[dict]) -> dict[str, float]: """Aus der gwoe_matrix-Liste den Mittelwert pro Wert-Spalte (1..5) berechnen. matrix-Eintraege haben ``field`` wie ``A1, B3, E5`` und ``rating`` -5..+5. Pro Spalten-Suffix wird Ø-rating berechnet — fehlende Felder werden ignoriert (LLM lässt nicht-relevante Zellen oft weg). """ by_col: defaultdict[str, list[float]] = defaultdict(list) for entry in matrix or []: if not isinstance(entry, dict): continue field = entry.get("field") or "" rating = entry.get("rating") if not field or rating is None: continue if len(field) >= 2 and field[-1] in GWOE_WERTE: by_col[field[-1]].append(float(rating)) return {col: sum(vals) / len(vals) for col, vals in by_col.items()} def aggregate_stimm_index_pro_wert( filter_bl: Optional[str] = None, filter_wp: Optional[str] = None, exclude_antragsteller: bool = True, min_n: int = 5, db_path: Optional[Path] = None, ) -> dict: """Pro (Fraktion, GWÖ-Wert) ein Stimm-Index-Analog: Ø-Wert-Score der JA-Antraege minus Ø-Wert-Score der NEIN-Antraege. Wert-Score eines Antrags = Ø(rating der gwoe_matrix-Felder mit dem entsprechenden Spalten-Suffix). Domain: -5..+5 pro Wert. """ rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path) werte_namen = list(GWOE_WERTE.values()) # in Reihenfolge 1..5 werte_keys = list(GWOE_WERTE.keys()) # Pro (partei, wert_key) → list[wert_score] fuer JA / NEIN ja: defaultdict[tuple[str, str], list[float]] = defaultdict(list) nein: defaultdict[tuple[str, str], list[float]] = defaultdict(list) parteien_seen: set[str] = set() for row in rows: wert_scores = _wert_score_for_assessment(row["gwoe_matrix"]) if not wert_scores: continue skip = row["antragsteller"] if exclude_antragsteller else set() for f in row["ja"] - skip: parteien_seen.add(f) for col, sc in wert_scores.items(): ja[(f, col)].append(sc) for f in row["nein"] - skip: parteien_seen.add(f) for col, sc in wert_scores.items(): nein[(f, col)].append(sc) parteien = sorted(parteien_seen) cells: dict[str, dict[str, dict]] = {} for p in parteien: cells[p] = {} for col, wert_name in zip(werte_keys, werte_namen): n_ja = len(ja[(p, col)]) n_nein = len(nein[(p, col)]) avg_ja = _avg(ja[(p, col)]) avg_nein = _avg(nein[(p, col)]) idx = (round(avg_ja - avg_nein, 2) if avg_ja is not None and avg_nein is not None else None) cells[p][wert_name] = { "stimm_index": idx, "n_ja": n_ja, "n_nein": n_nein, "ausreichend": n_ja >= min_n and n_nein >= min_n, } return { "fraktionen": parteien, "werte": werte_namen, "cells": cells, "n_assessments_matched": len({r["drucksache"] for r in rows}), "filter": { "bundesland": filter_bl, "wahlperiode": filter_wp, "exclude_antragsteller": exclude_antragsteller, "min_n": min_n, }, } def aggregate_empfehlungs_konsistenz( filter_bl: Optional[str] = None, filter_wp: Optional[str] = None, min_n: int = 5, db_path: Optional[Path] = None, ) -> dict: """Pro Fraktion: Anteil der Antraege mit GWÖ-Empfehlung "Uneingeschraenkt unterstuetzen" oder "Unterstuetzen mit Aenderungen", bei denen die Fraktion trotzdem NEIN gestimmt hat. Orthogonal zu Heuchelei-Score: prueft NICHT gegen Wahlprogramm-Treue, sondern gegen die GWÖ-Empfehlung des Systems. Misst Inkonsistenz zwischen "GWÖ haelt Antrag fuer gut" und "Fraktion stimmt dagegen". """ rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path) # Empfehlung ist im JOIN-Helper noch nicht — eigener Lookup pro Drucksache. # Statt Helper umzubauen: zweite Query auf assessments fuer empfehlung. path = db_path or settings.db_path if not Path(path).exists(): return {"fraktionen": [], "n_assessments_matched": 0, "filter": { "bundesland": filter_bl, "wahlperiode": filter_wp, "min_n": min_n, }} conn = sqlite3.connect(str(path)) try: empfehlung_map = { (r[0], r[1]): r[2] for r in conn.execute( "SELECT bundesland, drucksache, empfehlung FROM assessments" ).fetchall() } finally: conn.close() POSITIV = {"Uneingeschränkt unterstützen", "Unterstützen mit Änderungen"} n_empfohlen: defaultdict[str, int] = defaultdict(int) n_nein: defaultdict[str, int] = defaultdict(int) n_ja: defaultdict[str, int] = defaultdict(int) n_enth: defaultdict[str, int] = defaultdict(int) seen_drucksachen = set() for row in rows: empfehlung = empfehlung_map.get((row["bundesland"], row["drucksache"])) if empfehlung not in POSITIV: continue seen_drucksachen.add((row["bundesland"], row["drucksache"])) all_voters = row["ja"] | row["nein"] | row["enthaltung"] for f in all_voters: n_empfohlen[f] += 1 if f in row["nein"]: n_nein[f] += 1 elif f in row["ja"]: n_ja[f] += 1 elif f in row["enthaltung"]: n_enth[f] += 1 fraktionen_out = [] for partei in sorted(n_empfohlen): total = n_empfohlen[partei] nein = n_nein[partei] quote = round(nein / total, 3) if total else None fraktionen_out.append({ "partei": partei, "n_empfohlen": total, "n_nein_trotz_empfehlung": nein, "n_ja": n_ja[partei], "n_enth": n_enth[partei], "konsistenz_quote": quote, "ausreichend": total >= min_n, }) fraktionen_out.sort( key=lambda f: (f["konsistenz_quote"] or 0), reverse=True, ) return { "fraktionen": fraktionen_out, "n_assessments_matched": len(seen_drucksachen), "filter": { "bundesland": filter_bl, "wahlperiode": filter_wp, "min_n": min_n, }, } def aggregate_stimm_index_cross_bl( filter_wp: Optional[str] = None, exclude_antragsteller: bool = True, min_n: int = 5, db_path: Optional[Path] = None, ) -> dict: """Stimm-Index pro (Fraktion, Bundesland) — macht regionale Drift sichtbar fuer bundesweit aktive Fraktionen (CDU, SPD, GRÜNE, AfD, FDP, LINKE, BSW).""" rows = _load_assessments_with_votes(None, filter_wp, db_path) ja: defaultdict[tuple[str, str], list[float]] = defaultdict(list) nein: defaultdict[tuple[str, str], list[float]] = defaultdict(list) bl_seen: set[str] = set() parteien_seen: set[str] = set() for row in rows: bl = row["bundesland"] if not bl: continue bl_seen.add(bl) score = row["gwoe_score"] skip = row["antragsteller"] if exclude_antragsteller else set() for f in row["ja"] - skip: parteien_seen.add(f) ja[(f, bl)].append(score) for f in row["nein"] - skip: parteien_seen.add(f) nein[(f, bl)].append(score) bundeslaender = sorted(bl_seen) parteien = sorted(parteien_seen) cells: dict[str, dict[str, dict]] = {} for p in parteien: cells[p] = {} for bl in bundeslaender: n_ja = len(ja[(p, bl)]) n_nein = len(nein[(p, bl)]) avg_ja = _avg(ja[(p, bl)]) avg_nein = _avg(nein[(p, bl)]) idx = (round(avg_ja - avg_nein, 2) if avg_ja is not None and avg_nein is not None else None) cells[p][bl] = { "stimm_index": idx, "n_ja": n_ja, "n_nein": n_nein, "ausreichend": n_ja >= min_n and n_nein >= min_n, } # Filter Fraktionen mit nur 1 BL → in Phase 1 nicht aufschlussreich parteien_multi_bl = [ p for p in parteien if sum(1 for bl in bundeslaender if cells[p][bl]["ausreichend"]) >= 2 ] return { "fraktionen": parteien_multi_bl, "fraktionen_alle": parteien, "bundeslaender": bundeslaender, "cells": cells, "n_assessments_matched": len({r["drucksache"] for r in rows}), "filter": { "wahlperiode": filter_wp, "exclude_antragsteller": exclude_antragsteller, "min_n": min_n, }, } # ───────────────────────────────────────────────────────────────────────────── # 5. CSV-Export der Stimmverhalten-Aggregationen # # Long-Format-CSV pro Aggregation, analog zu export_long_format(). Macht die # Aussagen wissenschaftlich auswertbar (R/pandas/Excel) ohne JSON-Parsing. # ───────────────────────────────────────────────────────────────────────────── def export_stimmverhalten_csv( filter_bl: Optional[str] = None, filter_wp: Optional[str] = None, exclude_antragsteller: bool = True, db_path: Optional[Path] = None, ) -> str: """Long-Format-CSV: Eine Zeile pro (drucksache, partei, vote). Spalten: drucksache, bundesland, wahlperiode, datum, gwoe_score, empfehlung, partei, vote (ja|nein|enthaltung), ist_antragsteller. Eine Zeile pro Fraktion-Stimme — wer also an N Anträgen mit Vote teilgenommen hat, hat N Zeilen. """ rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path) # Empfehlung-Map analog aggregate_empfehlungs_konsistenz path = db_path or settings.db_path empfehlung_map: dict[tuple[str, str], str] = {} if Path(path).exists(): conn = sqlite3.connect(str(path)) try: empfehlung_map = { (r[0], r[1]): r[2] or "" for r in conn.execute( "SELECT bundesland, drucksache, empfehlung FROM assessments" ).fetchall() } finally: conn.close() buf = io.StringIO() writer = csv.writer(buf, dialect="excel") writer.writerow([ "drucksache", "bundesland", "wahlperiode", "datum", "gwoe_score", "empfehlung", "partei", "vote", "ist_antragsteller", ]) for row in rows: bl = row["bundesland"] wp = wahlperiode_for(row["datum"], bl) if bl else "" empfehlung = empfehlung_map.get((bl, row["drucksache"]), "") antragsteller = row["antragsteller"] for vote_key, voters in [ ("ja", row["ja"]), ("nein", row["nein"]), ("enthaltung", row["enthaltung"]), ]: for partei in sorted(voters): if exclude_antragsteller and partei in antragsteller: continue writer.writerow([ row["drucksache"], bl, wp or "", row["datum"], f"{row['gwoe_score']:.2f}", empfehlung, partei, vote_key, "1" if partei in antragsteller else "0", ]) return buf.getvalue()