"""Aggregations-Funktionen für die Auswertungen-Seite (#58). Liest direkt aus ``data/gwoe-antraege.db`` (assessments-Tabelle) und baut drei Sichten: 1. ``aggregate_matrix(filter_wp=None)`` — 2D-Matrix Bundesland × Partei mit (n, Ø-GWÖ-Score). Filterbar nach Wahlperiode. 2. ``aggregate_zeitreihe(bundesland, partei)`` — Score-Verlauf einer (BL, Partei)-Kombination über alle bekannten WPs. 3. ``export_long_format()`` — Long-Format-Tabelle für CSV-Export (deckt zusätzlich Issue #45 ab). Partei-Auflösung läuft strikt über ``app.parteien.normalize_partei`` — ohne den Mapper aus #55 würde z.B. BB-FW mit RP-FW in einen Topf gerührt. """ from __future__ import annotations import csv import io import json import sqlite3 from collections import defaultdict from pathlib import Path from typing import Optional from .config import settings from .parteien import normalize_partei from .wahlperioden import wahlperiode_for # ───────────────────────────────────────────────────────────────────────────── # Datenstrukturen # ───────────────────────────────────────────────────────────────────────────── def _load_assessments(db_path: Optional[Path] = None) -> list[dict]: """Lese alle Assessments aus der SQLite-DB. Kein Filter — die Aggregations-Funktionen filtern selbst. Kein async, weil die Sicht synchron berechnet werden kann.""" path = db_path or settings.db_path if not Path(path).exists(): return [] conn = sqlite3.connect(str(path)) try: conn.row_factory = sqlite3.Row rows = conn.execute( """ SELECT drucksache, bundesland, datum, fraktionen, gwoe_score FROM assessments WHERE gwoe_score IS NOT NULL """ ).fetchall() finally: conn.close() out: list[dict] = [] for r in rows: try: fraktionen = json.loads(r["fraktionen"]) if r["fraktionen"] else [] except (json.JSONDecodeError, TypeError): fraktionen = [] out.append({ "drucksache": r["drucksache"], "bundesland": r["bundesland"], "datum": r["datum"] or "", "fraktionen": fraktionen, "gwoe_score": r["gwoe_score"], }) return out # ───────────────────────────────────────────────────────────────────────────── # 1. Matrix Bundesland × Partei # ───────────────────────────────────────────────────────────────────────────── def aggregate_matrix( filter_wp: Optional[str] = None, db_path: Optional[Path] = None, ) -> dict: """Aggregate assessments to a 2D matrix. Returns: ``{ "bundeslaender": [...], "parteien": [...], "cells": { "": {"": {"n": int, "avg": float}} }, "filter_wp": | None, "total": int, }`` ``filter_wp`` ist eine ``"-WP"``-Kennung wie ``"NRW-WP18"``; nur Assessments dieser Wahlperiode fließen ein. ``None`` = keine WP-Einschränkung (alle WPs zusammen). """ rows = _load_assessments(db_path) bundeslaender: set[str] = set() parteien: set[str] = set() sums: defaultdict[tuple[str, str], float] = defaultdict(float) counts: defaultdict[tuple[str, str], int] = defaultdict(int) total = 0 for row in rows: bl = row["bundesland"] if not bl: continue if filter_wp is not None: wp = wahlperiode_for(row["datum"], bl) if wp != filter_wp: continue bundeslaender.add(bl) for raw_partei in row["fraktionen"]: canonical = normalize_partei(raw_partei, bundesland=bl) or raw_partei parteien.add(canonical) key = (bl, canonical) sums[key] += row["gwoe_score"] counts[key] += 1 total += 1 cells: dict[str, dict[str, dict]] = {} for (bl, partei), s in sums.items(): n = counts[(bl, partei)] cells.setdefault(bl, {})[partei] = { "n": n, "avg": round(s / n, 2) if n else None, } return { "bundeslaender": sorted(bundeslaender), "parteien": sorted(parteien), "cells": cells, "filter_wp": filter_wp, "total": total, } # ───────────────────────────────────────────────────────────────────────────── # 2. Zeitreihe pro (BL, Partei) über alle Wahlperioden # ───────────────────────────────────────────────────────────────────────────── def aggregate_zeitreihe( bundesland: str, partei: str, db_path: Optional[Path] = None, ) -> dict: """Score-Verlauf einer (BL, Partei)-Kombination über alle WPs. Returns: ``{ "bundesland": str, "partei": str, "wahlperioden": [ {"wp": "-WP", "n": int, "avg": float}, ... ] }`` """ rows = _load_assessments(db_path) sums: defaultdict[str, float] = defaultdict(float) counts: defaultdict[str, int] = defaultdict(int) for row in rows: if row["bundesland"] != bundesland: continue canonical_partei_in_row = { normalize_partei(p, bundesland=bundesland) or p for p in row["fraktionen"] } if partei not in canonical_partei_in_row: continue wp = wahlperiode_for(row["datum"], bundesland) if wp is None: continue sums[wp] += row["gwoe_score"] counts[wp] += 1 wps = sorted(sums.keys()) return { "bundesland": bundesland, "partei": partei, "wahlperioden": [ {"wp": wp, "n": counts[wp], "avg": round(sums[wp] / counts[wp], 2)} for wp in wps ], } # ───────────────────────────────────────────────────────────────────────────── # 3. Long-Format-Export für CSV (deckt #45 mit ab) # ───────────────────────────────────────────────────────────────────────────── def export_long_format(db_path: Optional[Path] = None) -> str: """Long-Format-CSV-Export aller Assessments für externe Auswertung. Spalten: ``drucksache,bundesland,wahlperiode,datum,partei,gwoe_score``. Eine Zeile pro (drucksache, partei) — wenn ein Antrag mehrere Fraktionen hat (Koalitionsanträge), erscheinen entsprechend mehrere Zeilen mit identischer Drucksache. """ rows = _load_assessments(db_path) buf = io.StringIO() writer = csv.writer(buf, dialect="excel") writer.writerow(["drucksache", "bundesland", "wahlperiode", "datum", "partei", "gwoe_score"]) for r in rows: bl = r["bundesland"] or "" wp = wahlperiode_for(r["datum"], bl) if bl else "" for raw_partei in r["fraktionen"]: canonical = normalize_partei(raw_partei, bundesland=bl) or raw_partei writer.writerow([ r["drucksache"], bl, wp or "", r["datum"], canonical, f"{r['gwoe_score']:.2f}", ]) return buf.getvalue()