gwoe-antragspruefer/app/auswertungen.py
Dotty Dotter 5eabe0d9b3 feat: Stimmverhalten × Gemeinwohl-Orientierung in /auswertungen
Neue Auswertungs-Sicht: Welche Fraktionen stimmen häufiger gemeinwohl-
orientierten Anträgen zu? Verschneidet GWÖ-Bewertung pro Antrag mit
dem tatsächlichen Plenum-Stimmverhalten der Fraktionen.

Vier Aussagen, alle hinter dem neuen Tab "Stimmverhalten":

1. **Gemeinwohl-Stimm-Index** pro Fraktion: Ø-GWÖ-Score der JA-Anträge
   minus Ø-GWÖ-Score der NEIN-Anträge. Domain −10..+10. Positiv = stimmt
   eher Gemeinwohl-affinen Anträgen zu.

2. **Heuchelei-Quote** pro Fraktion: Anteil der Anträge mit
   wahlprogramm_score ≥ 7 (passt zum eigenen Wahlprogramm), bei denen
   die Fraktion trotzdem NEIN gestimmt hat.

3. **Stimm-Index pro GWÖ-Wert** als Heatmap: 5 Spalten (Würde,
   Solidarität, Nachhaltigkeit, Gerechtigkeit, Demokratie) aus den
   gwoe_matrix-Suffix-Spalten. Domain −5..+5 pro Zelle.

4. **Cross-BL-Vergleich** als Grouped Bar: gleiche Fraktion in
   mehreren Ländern. Nur Fraktionen in ≥2 BL mit ausreichender
   Datenbasis.

Querschnitt:
- `exclude_antragsteller=True` per Default (Toggle-Checkbox in UI),
  weil Antragsteller-Fraktionen quasi immer JA stimmen → würde Index
  verzerren. Toggle macht den Effekt sichtbar.
- `min_n=5` pro Fraktion fuer Stimm-Index, n=3 fuer Heatmaps.
  Fraktionen unter dem Cutoff werden als "Nicht aussagekräftig" separat
  gelistet.
- Caveat-Banner mit `n_assessments_matched` über jedem Chart.

Implementation:
- `app/auswertungen.py`: `_load_assessments_with_votes()` JOIN-Helper
  + 4 Aggregat-Funktionen analog zu `aggregate_matrix`-Pattern.
  Reuse: `normalize_partei` für Aliasing (BÜNDNIS 90/DIE GRÜNEN →
  GRÜNE), `wahlperiode_for` für WP-Filter.
- `app/main.py`: 4 neue read-only GET-Endpoints unter
  `/api/auswertungen/stimm-index|heuchelei|stimm-index-pro-wert|
  stimm-index-cross-bl`.
- `app/templates/v2/screens/auswertungen.html`: 4. Tab "Stimmverhalten"
  mit 4 Sub-Sektionen, Chart.js Bars + HTML-Heatmap-Tabelle.
- `tests/test_auswertungen_stimmverhalten.py`: 18 neue Tests
  (Fixture-DB mit 13 Assessments + 13 Vote-Results, Edge-Cases:
  GRÜNE-positiver-Index, AfD-negativer-Index, exclude_antragsteller-
  Effekt, min_n-Cutoff, leere DB).

Sparse-Data-Realität: aktuell 35 Assessments im prod, dünne Datenbasis
fuer einige Fraktionen. Feature wächst mit Issue #44 Batch-Bewertung.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 15:30:02 +02:00

658 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Aggregations-Funktionen für die Auswertungen-Seite (#58).
Liest direkt aus ``data/gwoe-antraege.db`` (assessments-Tabelle) und baut
drei Sichten:
1. ``aggregate_matrix(filter_wp=None)`` — 2D-Matrix Bundesland × Partei
mit (n, Ø-GWÖ-Score). Filterbar nach Wahlperiode.
2. ``aggregate_zeitreihe(bundesland, partei)`` — Score-Verlauf einer
(BL, Partei)-Kombination über alle bekannten WPs.
3. ``export_long_format()`` — Long-Format-Tabelle für CSV-Export
(deckt zusätzlich Issue #45 ab).
Partei-Auflösung läuft strikt über ``app.parteien.normalize_partei`` —
ohne den Mapper aus #55 würde z.B. BB-FW mit RP-FW in einen Topf
gerührt.
"""
from __future__ import annotations
import csv
import io
import json
import sqlite3
from collections import defaultdict
from pathlib import Path
from typing import Optional
from .config import settings
from .parteien import normalize_partei
from .wahlperioden import wahlperiode_for
# ─────────────────────────────────────────────────────────────────────────────
# Datenstrukturen
# ─────────────────────────────────────────────────────────────────────────────
def _load_assessments(db_path: Optional[Path] = None) -> list[dict]:
"""Lese alle Assessments aus der SQLite-DB. Kein Filter — die
Aggregations-Funktionen filtern selbst. Kein async, weil die
Sicht synchron berechnet werden kann."""
path = db_path or settings.db_path
if not Path(path).exists():
return []
conn = sqlite3.connect(str(path))
try:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""
SELECT drucksache, bundesland, datum, fraktionen, gwoe_score
FROM assessments
WHERE gwoe_score IS NOT NULL
"""
).fetchall()
finally:
conn.close()
out: list[dict] = []
for r in rows:
try:
fraktionen = json.loads(r["fraktionen"]) if r["fraktionen"] else []
except (json.JSONDecodeError, TypeError):
fraktionen = []
out.append({
"drucksache": r["drucksache"],
"bundesland": r["bundesland"],
"datum": r["datum"] or "",
"fraktionen": fraktionen,
"gwoe_score": r["gwoe_score"],
})
return out
# ─────────────────────────────────────────────────────────────────────────────
# 1. Matrix Bundesland × Partei
# ─────────────────────────────────────────────────────────────────────────────
def aggregate_matrix(
filter_wp: Optional[str] = None,
filter_bl: Optional[str] = None,
db_path: Optional[Path] = None,
) -> dict:
"""Aggregate assessments to a 2D matrix.
Returns:
``{
"bundeslaender": [...],
"parteien": [...],
"cells": {
"<bl>": {"<partei>": {"n": int, "avg": float}}
},
"filter_wp": <filter_wp> | None,
"filter_bl": <filter_bl> | None,
"total": int,
}``
``filter_wp`` ist eine ``"<BL>-WP<n>"``-Kennung wie ``"NRW-WP18"``;
nur Assessments dieser Wahlperiode fließen ein. ``None`` = keine
WP-Einschränkung (alle WPs zusammen).
``filter_bl`` schränkt auf ein Bundesland ein (z.B. ``"NRW"``);
``None`` = alle Bundesländer.
"""
rows = _load_assessments(db_path)
bundeslaender: set[str] = set()
parteien: set[str] = set()
sums: defaultdict[tuple[str, str], float] = defaultdict(float)
counts: defaultdict[tuple[str, str], int] = defaultdict(int)
total = 0
for row in rows:
bl = row["bundesland"]
if not bl:
continue
if filter_bl is not None and bl != filter_bl:
continue
if filter_wp is not None:
wp = wahlperiode_for(row["datum"], bl)
if wp != filter_wp:
continue
bundeslaender.add(bl)
for raw_partei in row["fraktionen"]:
canonical = normalize_partei(raw_partei, bundesland=bl) or raw_partei
parteien.add(canonical)
key = (bl, canonical)
sums[key] += row["gwoe_score"]
counts[key] += 1
total += 1
cells: dict[str, dict[str, dict]] = {}
for (bl, partei), s in sums.items():
n = counts[(bl, partei)]
cells.setdefault(bl, {})[partei] = {
"n": n,
"avg": round(s / n, 2) if n else None,
}
return {
"bundeslaender": sorted(bundeslaender),
"parteien": sorted(parteien),
"cells": cells,
"filter_wp": filter_wp,
"filter_bl": filter_bl,
"total": total,
}
# ─────────────────────────────────────────────────────────────────────────────
# 1b. Hilfsfunktion: Liste aller bekannten Wahlperioden
# ─────────────────────────────────────────────────────────────────────────────
def get_wahlperioden(db_path: Optional[Path] = None) -> list[str]:
"""Gibt alle bekannten Wahlperioden aus den vorhandenen Assessments zurück,
aufsteigend sortiert."""
rows = _load_assessments(db_path)
wps: set[str] = set()
for r in rows:
wp = wahlperiode_for(r["drucksache"], r["bundesland"])
if wp:
wps.add(wp)
return sorted(wps)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Zeitreihe pro (BL, Partei) über alle Wahlperioden
# ─────────────────────────────────────────────────────────────────────────────
def aggregate_zeitreihe(
bundesland: str,
partei: str,
db_path: Optional[Path] = None,
) -> dict:
"""Score-Verlauf einer (BL, Partei)-Kombination über alle WPs.
Returns:
``{
"bundesland": str,
"partei": str,
"wahlperioden": [
{"wp": "<BL>-WP<n>", "n": int, "avg": float},
...
]
}``
"""
rows = _load_assessments(db_path)
sums: defaultdict[str, float] = defaultdict(float)
counts: defaultdict[str, int] = defaultdict(int)
for row in rows:
if row["bundesland"] != bundesland:
continue
canonical_partei_in_row = {
normalize_partei(p, bundesland=bundesland) or p
for p in row["fraktionen"]
}
if partei not in canonical_partei_in_row:
continue
wp = wahlperiode_for(row["datum"], bundesland)
if wp is None:
continue
sums[wp] += row["gwoe_score"]
counts[wp] += 1
wps = sorted(sums.keys())
return {
"bundesland": bundesland,
"partei": partei,
"wahlperioden": [
{"wp": wp, "n": counts[wp], "avg": round(sums[wp] / counts[wp], 2)}
for wp in wps
],
}
# ─────────────────────────────────────────────────────────────────────────────
# 3. Long-Format-Export für CSV (deckt #45 mit ab)
# ─────────────────────────────────────────────────────────────────────────────
def export_long_format(db_path: Optional[Path] = None) -> str:
"""Long-Format-CSV-Export aller Assessments für externe Auswertung.
Spalten: ``drucksache,bundesland,wahlperiode,datum,partei,gwoe_score``.
Eine Zeile pro (drucksache, partei) — wenn ein Antrag mehrere
Fraktionen hat (Koalitionsanträge), erscheinen entsprechend mehrere
Zeilen mit identischer Drucksache.
"""
rows = _load_assessments(db_path)
buf = io.StringIO()
writer = csv.writer(buf, dialect="excel")
writer.writerow(["drucksache", "bundesland", "wahlperiode", "datum", "partei", "gwoe_score"])
for r in rows:
bl = r["bundesland"] or ""
wp = wahlperiode_for(r["datum"], bl) if bl else ""
for raw_partei in r["fraktionen"]:
canonical = normalize_partei(raw_partei, bundesland=bl) or raw_partei
writer.writerow([
r["drucksache"], bl, wp or "", r["datum"], canonical,
f"{r['gwoe_score']:.2f}",
])
return buf.getvalue()
# ─────────────────────────────────────────────────────────────────────────────
# 4. Stimmverhalten × Gemeinwohl-Orientierung (#106 + #145 Folge)
#
# JOIN ueber assessments.gwoe_score × plenum_vote_results.fraktionen_ja|_nein|
# _enthaltung. Vier Sichten:
# - aggregate_stimm_index: pro Fraktion JA-Mean minus NEIN-Mean von gwoe_score
# - aggregate_heuchelei: pro Fraktion % der Antraege mit
# wahlprogramm_score>=7 wo Vote=NEIN
# - aggregate_stimm_index_pro_wert: stimm_index pro (Fraktion, GWOe-Wert)
# aus gwoe_matrix-Spalten
# - aggregate_stimm_index_cross_bl: stimm_index pro (Fraktion, BL)
#
# Sparse-Data-Realitaet: 35 Assessments × 7000 Votes → meiste Anträge mit Vote
# haben kein Assessment. min_n-Cutoff filtert Fraktionen mit zu wenig Daten.
# ─────────────────────────────────────────────────────────────────────────────
# Spalten der GWOe-Matrix: {field-suffix → Wert-Name}. Die Field-IDs in der DB
# sind ``A1..E5`` mit Reihe = Beruehrungsgruppe (A-E), Spalte = Wert (1-5).
GWOE_WERTE = {
"1": "Menschenwürde",
"2": "Solidarität",
"3": "Ökologische Nachhaltigkeit",
"4": "Soziale Gerechtigkeit",
"5": "Transparenz & Demokratie",
}
def _load_assessments_with_votes(
filter_bl: Optional[str] = None,
filter_wp: Optional[str] = None,
db_path: Optional[Path] = None,
) -> list[dict]:
"""JOIN assessments × plenum_vote_results auf (bundesland, drucksache).
Liefert pro Match-Zeile alle relevanten Felder fuer die Stimmverhalten-
Aggregationen — nur Assessments mit gwoe_score und Vote-Result.
Mehrfach-Votes pro Drucksache (Compound-PK ueber quelle_protokoll)
erzeugen entsprechend mehrere Zeilen.
"""
path = db_path or settings.db_path
if not Path(path).exists():
return []
conn = sqlite3.connect(str(path))
try:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""
SELECT a.drucksache, a.bundesland, a.datum,
a.fraktionen, a.gwoe_score, a.gwoe_matrix,
a.gwoe_schwerpunkt, a.wahlprogramm_scores,
p.fraktionen_ja, p.fraktionen_nein, p.fraktionen_enthaltung,
p.quelle_protokoll
FROM assessments a
INNER JOIN plenum_vote_results p
ON a.bundesland = p.bundesland
AND a.drucksache = p.drucksache
WHERE a.gwoe_score IS NOT NULL
"""
).fetchall()
finally:
conn.close()
out: list[dict] = []
for r in rows:
bl = r["bundesland"] or ""
if filter_bl is not None and bl != filter_bl:
continue
if filter_wp is not None:
wp = wahlperiode_for(r["datum"], bl)
if wp != filter_wp:
continue
def _parse_json_list(raw):
try:
return json.loads(raw) if raw else []
except (json.JSONDecodeError, TypeError):
return []
def _norm_set(raw_list):
"""Normalisiere eine Fraktionsliste auf kanonische Namen."""
return {
normalize_partei(p, bundesland=bl) or p
for p in raw_list if p
}
antragsteller = _norm_set(_parse_json_list(r["fraktionen"]))
ja = _norm_set(_parse_json_list(r["fraktionen_ja"]))
nein = _norm_set(_parse_json_list(r["fraktionen_nein"]))
enth = _norm_set(_parse_json_list(r["fraktionen_enthaltung"]))
out.append({
"drucksache": r["drucksache"],
"bundesland": bl,
"datum": r["datum"] or "",
"gwoe_score": r["gwoe_score"],
"gwoe_matrix": _parse_json_list(r["gwoe_matrix"]),
"gwoe_schwerpunkt": _parse_json_list(r["gwoe_schwerpunkt"]),
"wahlprogramm_scores": _parse_json_list(r["wahlprogramm_scores"]),
"antragsteller": antragsteller,
"ja": ja,
"nein": nein,
"enthaltung": enth,
"quelle_protokoll": r["quelle_protokoll"],
})
return out
def _avg(values: list[float]) -> Optional[float]:
return round(sum(values) / len(values), 2) if values else None
def aggregate_stimm_index(
filter_bl: Optional[str] = None,
filter_wp: Optional[str] = None,
exclude_antragsteller: bool = True,
min_n: int = 5,
db_path: Optional[Path] = None,
) -> dict:
"""Pro Fraktion: Ø-GWÖ-Score der JA-Antraege minus Ø-GWÖ-Score der
NEIN-Antraege. Antragsteller-Bias optional rausgerechnet.
``stimm_index`` ist None wenn n_ja oder n_nein = 0; ``ausreichend``
ist True wenn n_ja >= min_n und n_nein >= min_n (Domain-Heuristik:
Aussage erst belastbar bei beidseitigem Mindest-N).
"""
rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path)
ja_scores: defaultdict[str, list[float]] = defaultdict(list)
nein_scores: defaultdict[str, list[float]] = defaultdict(list)
enth_scores: defaultdict[str, list[float]] = defaultdict(list)
for row in rows:
score = row["gwoe_score"]
skip = row["antragsteller"] if exclude_antragsteller else set()
for f in row["ja"] - skip:
ja_scores[f].append(score)
for f in row["nein"] - skip:
nein_scores[f].append(score)
for f in row["enthaltung"] - skip:
enth_scores[f].append(score)
parteien = sorted(set(ja_scores) | set(nein_scores) | set(enth_scores))
fraktionen_out = []
for p in parteien:
n_ja = len(ja_scores[p])
n_nein = len(nein_scores[p])
n_enth = len(enth_scores[p])
avg_ja = _avg(ja_scores[p])
avg_nein = _avg(nein_scores[p])
idx = (round(avg_ja - avg_nein, 2)
if avg_ja is not None and avg_nein is not None else None)
fraktionen_out.append({
"partei": p,
"n_ja": n_ja,
"n_nein": n_nein,
"n_enth": n_enth,
"avg_gwoe_ja": avg_ja,
"avg_gwoe_nein": avg_nein,
"avg_gwoe_enth": _avg(enth_scores[p]),
"stimm_index": idx,
"ausreichend": n_ja >= min_n and n_nein >= min_n,
})
fraktionen_out.sort(
key=lambda f: (f["stimm_index"] if f["stimm_index"] is not None else -999),
reverse=True,
)
return {
"fraktionen": fraktionen_out,
"n_assessments_matched": len({r["drucksache"] for r in rows}),
"n_votes_matched": len(rows),
"filter": {
"bundesland": filter_bl,
"wahlperiode": filter_wp,
"exclude_antragsteller": exclude_antragsteller,
"min_n": min_n,
},
}
def aggregate_heuchelei(
filter_bl: Optional[str] = None,
filter_wp: Optional[str] = None,
score_threshold: float = 7.0,
min_n: int = 5,
db_path: Optional[Path] = None,
) -> dict:
"""Pro Fraktion: Anteil der Antraege mit wahlprogramm_score >= 7
(Antrag passt zum Wahlprogramm), bei denen die Fraktion trotzdem
NEIN gestimmt hat.
Misst Inkonsistenz zwischen Wahlversprechen und Stimmverhalten.
"""
rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path)
n_passt: defaultdict[str, int] = defaultdict(int)
n_passt_nein: defaultdict[str, int] = defaultdict(int)
n_passt_ja: defaultdict[str, int] = defaultdict(int)
n_passt_enth: defaultdict[str, int] = defaultdict(int)
for row in rows:
wp_scores = row["wahlprogramm_scores"] or []
for entry in wp_scores:
if not isinstance(entry, dict):
continue
raw_partei = entry.get("fraktion") or ""
partei = normalize_partei(raw_partei, bundesland=row["bundesland"]) or raw_partei
if not partei:
continue
wp_block = entry.get("wahlprogramm") or {}
score = wp_block.get("score")
if score is None or score < score_threshold:
continue
n_passt[partei] += 1
if partei in row["nein"]:
n_passt_nein[partei] += 1
elif partei in row["ja"]:
n_passt_ja[partei] += 1
elif partei in row["enthaltung"]:
n_passt_enth[partei] += 1
fraktionen_out = []
for partei in sorted(n_passt):
total = n_passt[partei]
nein = n_passt_nein[partei]
ja = n_passt_ja[partei]
enth = n_passt_enth[partei]
quote = round(nein / total, 3) if total else None
fraktionen_out.append({
"partei": partei,
"n_im_programm": total,
"n_nein_trotz_programm": nein,
"n_ja_passt": ja,
"n_enth_passt": enth,
"heuchelei_quote": quote,
"ausreichend": total >= min_n,
})
fraktionen_out.sort(
key=lambda f: (f["heuchelei_quote"] or 0), reverse=True,
)
return {
"fraktionen": fraktionen_out,
"n_assessments_matched": len({r["drucksache"] for r in rows}),
"filter": {
"bundesland": filter_bl,
"wahlperiode": filter_wp,
"score_threshold": score_threshold,
"min_n": min_n,
},
}
def _wert_score_for_assessment(matrix: list[dict]) -> dict[str, float]:
"""Aus der gwoe_matrix-Liste den Mittelwert pro Wert-Spalte (1..5)
berechnen.
matrix-Eintraege haben ``field`` wie ``A1, B3, E5`` und ``rating`` -5..+5.
Pro Spalten-Suffix wird Ø-rating berechnet — fehlende Felder werden
ignoriert (LLM lässt nicht-relevante Zellen oft weg).
"""
by_col: defaultdict[str, list[float]] = defaultdict(list)
for entry in matrix or []:
if not isinstance(entry, dict):
continue
field = entry.get("field") or ""
rating = entry.get("rating")
if not field or rating is None:
continue
if len(field) >= 2 and field[-1] in GWOE_WERTE:
by_col[field[-1]].append(float(rating))
return {col: sum(vals) / len(vals) for col, vals in by_col.items()}
def aggregate_stimm_index_pro_wert(
filter_bl: Optional[str] = None,
filter_wp: Optional[str] = None,
exclude_antragsteller: bool = True,
min_n: int = 5,
db_path: Optional[Path] = None,
) -> dict:
"""Pro (Fraktion, GWÖ-Wert) ein Stimm-Index-Analog: Ø-Wert-Score der
JA-Antraege minus Ø-Wert-Score der NEIN-Antraege.
Wert-Score eines Antrags = Ø(rating der gwoe_matrix-Felder mit dem
entsprechenden Spalten-Suffix). Domain: -5..+5 pro Wert.
"""
rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path)
werte_namen = list(GWOE_WERTE.values()) # in Reihenfolge 1..5
werte_keys = list(GWOE_WERTE.keys())
# Pro (partei, wert_key) → list[wert_score] fuer JA / NEIN
ja: defaultdict[tuple[str, str], list[float]] = defaultdict(list)
nein: defaultdict[tuple[str, str], list[float]] = defaultdict(list)
parteien_seen: set[str] = set()
for row in rows:
wert_scores = _wert_score_for_assessment(row["gwoe_matrix"])
if not wert_scores:
continue
skip = row["antragsteller"] if exclude_antragsteller else set()
for f in row["ja"] - skip:
parteien_seen.add(f)
for col, sc in wert_scores.items():
ja[(f, col)].append(sc)
for f in row["nein"] - skip:
parteien_seen.add(f)
for col, sc in wert_scores.items():
nein[(f, col)].append(sc)
parteien = sorted(parteien_seen)
cells: dict[str, dict[str, dict]] = {}
for p in parteien:
cells[p] = {}
for col, wert_name in zip(werte_keys, werte_namen):
n_ja = len(ja[(p, col)])
n_nein = len(nein[(p, col)])
avg_ja = _avg(ja[(p, col)])
avg_nein = _avg(nein[(p, col)])
idx = (round(avg_ja - avg_nein, 2)
if avg_ja is not None and avg_nein is not None else None)
cells[p][wert_name] = {
"stimm_index": idx,
"n_ja": n_ja,
"n_nein": n_nein,
"ausreichend": n_ja >= min_n and n_nein >= min_n,
}
return {
"fraktionen": parteien,
"werte": werte_namen,
"cells": cells,
"n_assessments_matched": len({r["drucksache"] for r in rows}),
"filter": {
"bundesland": filter_bl,
"wahlperiode": filter_wp,
"exclude_antragsteller": exclude_antragsteller,
"min_n": min_n,
},
}
def aggregate_stimm_index_cross_bl(
filter_wp: Optional[str] = None,
exclude_antragsteller: bool = True,
min_n: int = 5,
db_path: Optional[Path] = None,
) -> dict:
"""Stimm-Index pro (Fraktion, Bundesland) — macht regionale Drift
sichtbar fuer bundesweit aktive Fraktionen (CDU, SPD, GRÜNE, AfD,
FDP, LINKE, BSW)."""
rows = _load_assessments_with_votes(None, filter_wp, db_path)
ja: defaultdict[tuple[str, str], list[float]] = defaultdict(list)
nein: defaultdict[tuple[str, str], list[float]] = defaultdict(list)
bl_seen: set[str] = set()
parteien_seen: set[str] = set()
for row in rows:
bl = row["bundesland"]
if not bl:
continue
bl_seen.add(bl)
score = row["gwoe_score"]
skip = row["antragsteller"] if exclude_antragsteller else set()
for f in row["ja"] - skip:
parteien_seen.add(f)
ja[(f, bl)].append(score)
for f in row["nein"] - skip:
parteien_seen.add(f)
nein[(f, bl)].append(score)
bundeslaender = sorted(bl_seen)
parteien = sorted(parteien_seen)
cells: dict[str, dict[str, dict]] = {}
for p in parteien:
cells[p] = {}
for bl in bundeslaender:
n_ja = len(ja[(p, bl)])
n_nein = len(nein[(p, bl)])
avg_ja = _avg(ja[(p, bl)])
avg_nein = _avg(nein[(p, bl)])
idx = (round(avg_ja - avg_nein, 2)
if avg_ja is not None and avg_nein is not None else None)
cells[p][bl] = {
"stimm_index": idx,
"n_ja": n_ja,
"n_nein": n_nein,
"ausreichend": n_ja >= min_n and n_nein >= min_n,
}
# Filter Fraktionen mit nur 1 BL → in Phase 1 nicht aufschlussreich
parteien_multi_bl = [
p for p in parteien
if sum(1 for bl in bundeslaender if cells[p][bl]["ausreichend"]) >= 2
]
return {
"fraktionen": parteien_multi_bl,
"fraktionen_alle": parteien,
"bundeslaender": bundeslaender,
"cells": cells,
"n_assessments_matched": len({r["drucksache"] for r in rows}),
"filter": {
"wahlperiode": filter_wp,
"exclude_antragsteller": exclude_antragsteller,
"min_n": min_n,
},
}