Stimm-Index pro Beruehrungsgruppe (Matrix-Zeilen A-E) zusaetzlich zur bestehenden Werte-Aufschluesselung (Spalten 1-5). Toggle-Buttons in der 3. Sub-Section schalten zwischen Werte/Gruppen. - `aggregate_stimm_index_pro_gruppe()` analog zu `_pro_wert`, aber gruppiert nach `field[0]` (A-E) statt `field[-1]` (1-5). - `_gruppen_score_for_assessment()` Helper. - `GET /api/auswertungen/stimm-index-pro-gruppe`. - UI-Toggle "Pro GWÖ-Wert" / "Pro Berührungsgruppe" mit `setMatrixAxis()`. - 6 neue Tests, Suite jetzt 995 grün. Beruehrungsgruppen-Labels (aus app/models.py:MATRIX_LABELS gekuerzt): - A: Ausgelagerte Betriebe / Lieferant:innen - B: Finanzpartner:innen / Steuerzahler:innen - C: Politische Führung / Verwaltung / Ehrenamt - D: Bürger:innen und Wirtschaft - E: Staat, Gesellschaft und Natur Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
902 lines
34 KiB
Python
902 lines
34 KiB
Python
"""Aggregations-Funktionen für die Auswertungen-Seite (#58).
|
||
|
||
Liest direkt aus ``data/gwoe-antraege.db`` (assessments-Tabelle) und baut
|
||
drei Sichten:
|
||
|
||
1. ``aggregate_matrix(filter_wp=None)`` — 2D-Matrix Bundesland × Partei
|
||
mit (n, Ø-GWÖ-Score). Filterbar nach Wahlperiode.
|
||
2. ``aggregate_zeitreihe(bundesland, partei)`` — Score-Verlauf einer
|
||
(BL, Partei)-Kombination über alle bekannten WPs.
|
||
3. ``export_long_format()`` — Long-Format-Tabelle für CSV-Export
|
||
(deckt zusätzlich Issue #45 ab).
|
||
|
||
Partei-Auflösung läuft strikt über ``app.parteien.normalize_partei`` —
|
||
ohne den Mapper aus #55 würde z.B. BB-FW mit RP-FW in einen Topf
|
||
gerührt.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
import io
|
||
import json
|
||
import sqlite3
|
||
from collections import defaultdict
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
from .config import settings
|
||
from .parteien import normalize_partei
|
||
from .wahlperioden import wahlperiode_for
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Datenstrukturen
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def _load_assessments(db_path: Optional[Path] = None) -> list[dict]:
|
||
"""Lese alle Assessments aus der SQLite-DB. Kein Filter — die
|
||
Aggregations-Funktionen filtern selbst. Kein async, weil die
|
||
Sicht synchron berechnet werden kann."""
|
||
path = db_path or settings.db_path
|
||
if not Path(path).exists():
|
||
return []
|
||
conn = sqlite3.connect(str(path))
|
||
try:
|
||
conn.row_factory = sqlite3.Row
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT drucksache, bundesland, datum, fraktionen, gwoe_score
|
||
FROM assessments
|
||
WHERE gwoe_score IS NOT NULL
|
||
"""
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
out: list[dict] = []
|
||
for r in rows:
|
||
try:
|
||
fraktionen = json.loads(r["fraktionen"]) if r["fraktionen"] else []
|
||
except (json.JSONDecodeError, TypeError):
|
||
fraktionen = []
|
||
out.append({
|
||
"drucksache": r["drucksache"],
|
||
"bundesland": r["bundesland"],
|
||
"datum": r["datum"] or "",
|
||
"fraktionen": fraktionen,
|
||
"gwoe_score": r["gwoe_score"],
|
||
})
|
||
return out
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 1. Matrix Bundesland × Partei
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def aggregate_matrix(
|
||
filter_wp: Optional[str] = None,
|
||
filter_bl: Optional[str] = None,
|
||
db_path: Optional[Path] = None,
|
||
) -> dict:
|
||
"""Aggregate assessments to a 2D matrix.
|
||
|
||
Returns:
|
||
``{
|
||
"bundeslaender": [...],
|
||
"parteien": [...],
|
||
"cells": {
|
||
"<bl>": {"<partei>": {"n": int, "avg": float}}
|
||
},
|
||
"filter_wp": <filter_wp> | None,
|
||
"filter_bl": <filter_bl> | None,
|
||
"total": int,
|
||
}``
|
||
|
||
``filter_wp`` ist eine ``"<BL>-WP<n>"``-Kennung wie ``"NRW-WP18"``;
|
||
nur Assessments dieser Wahlperiode fließen ein. ``None`` = keine
|
||
WP-Einschränkung (alle WPs zusammen).
|
||
|
||
``filter_bl`` schränkt auf ein Bundesland ein (z.B. ``"NRW"``);
|
||
``None`` = alle Bundesländer.
|
||
"""
|
||
rows = _load_assessments(db_path)
|
||
|
||
bundeslaender: set[str] = set()
|
||
parteien: set[str] = set()
|
||
sums: defaultdict[tuple[str, str], float] = defaultdict(float)
|
||
counts: defaultdict[tuple[str, str], int] = defaultdict(int)
|
||
total = 0
|
||
|
||
for row in rows:
|
||
bl = row["bundesland"]
|
||
if not bl:
|
||
continue
|
||
if filter_bl is not None and bl != filter_bl:
|
||
continue
|
||
if filter_wp is not None:
|
||
wp = wahlperiode_for(row["datum"], bl)
|
||
if wp != filter_wp:
|
||
continue
|
||
bundeslaender.add(bl)
|
||
for raw_partei in row["fraktionen"]:
|
||
canonical = normalize_partei(raw_partei, bundesland=bl) or raw_partei
|
||
parteien.add(canonical)
|
||
key = (bl, canonical)
|
||
sums[key] += row["gwoe_score"]
|
||
counts[key] += 1
|
||
total += 1
|
||
|
||
cells: dict[str, dict[str, dict]] = {}
|
||
for (bl, partei), s in sums.items():
|
||
n = counts[(bl, partei)]
|
||
cells.setdefault(bl, {})[partei] = {
|
||
"n": n,
|
||
"avg": round(s / n, 2) if n else None,
|
||
}
|
||
|
||
return {
|
||
"bundeslaender": sorted(bundeslaender),
|
||
"parteien": sorted(parteien),
|
||
"cells": cells,
|
||
"filter_wp": filter_wp,
|
||
"filter_bl": filter_bl,
|
||
"total": total,
|
||
}
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 1b. Hilfsfunktion: Liste aller bekannten Wahlperioden
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def get_wahlperioden(db_path: Optional[Path] = None) -> list[str]:
|
||
"""Gibt alle bekannten Wahlperioden aus den vorhandenen Assessments zurück,
|
||
aufsteigend sortiert."""
|
||
rows = _load_assessments(db_path)
|
||
wps: set[str] = set()
|
||
for r in rows:
|
||
wp = wahlperiode_for(r["drucksache"], r["bundesland"])
|
||
if wp:
|
||
wps.add(wp)
|
||
return sorted(wps)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 2. Zeitreihe pro (BL, Partei) über alle Wahlperioden
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def aggregate_zeitreihe(
|
||
bundesland: str,
|
||
partei: str,
|
||
db_path: Optional[Path] = None,
|
||
) -> dict:
|
||
"""Score-Verlauf einer (BL, Partei)-Kombination über alle WPs.
|
||
|
||
Returns:
|
||
``{
|
||
"bundesland": str,
|
||
"partei": str,
|
||
"wahlperioden": [
|
||
{"wp": "<BL>-WP<n>", "n": int, "avg": float},
|
||
...
|
||
]
|
||
}``
|
||
"""
|
||
rows = _load_assessments(db_path)
|
||
sums: defaultdict[str, float] = defaultdict(float)
|
||
counts: defaultdict[str, int] = defaultdict(int)
|
||
|
||
for row in rows:
|
||
if row["bundesland"] != bundesland:
|
||
continue
|
||
canonical_partei_in_row = {
|
||
normalize_partei(p, bundesland=bundesland) or p
|
||
for p in row["fraktionen"]
|
||
}
|
||
if partei not in canonical_partei_in_row:
|
||
continue
|
||
wp = wahlperiode_for(row["datum"], bundesland)
|
||
if wp is None:
|
||
continue
|
||
sums[wp] += row["gwoe_score"]
|
||
counts[wp] += 1
|
||
|
||
wps = sorted(sums.keys())
|
||
return {
|
||
"bundesland": bundesland,
|
||
"partei": partei,
|
||
"wahlperioden": [
|
||
{"wp": wp, "n": counts[wp], "avg": round(sums[wp] / counts[wp], 2)}
|
||
for wp in wps
|
||
],
|
||
}
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 3. Long-Format-Export für CSV (deckt #45 mit ab)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def export_long_format(db_path: Optional[Path] = None) -> str:
|
||
"""Long-Format-CSV-Export aller Assessments für externe Auswertung.
|
||
|
||
Spalten: ``drucksache,bundesland,wahlperiode,datum,partei,gwoe_score``.
|
||
Eine Zeile pro (drucksache, partei) — wenn ein Antrag mehrere
|
||
Fraktionen hat (Koalitionsanträge), erscheinen entsprechend mehrere
|
||
Zeilen mit identischer Drucksache.
|
||
"""
|
||
rows = _load_assessments(db_path)
|
||
buf = io.StringIO()
|
||
writer = csv.writer(buf, dialect="excel")
|
||
writer.writerow(["drucksache", "bundesland", "wahlperiode", "datum", "partei", "gwoe_score"])
|
||
for r in rows:
|
||
bl = r["bundesland"] or ""
|
||
wp = wahlperiode_for(r["datum"], bl) if bl else ""
|
||
for raw_partei in r["fraktionen"]:
|
||
canonical = normalize_partei(raw_partei, bundesland=bl) or raw_partei
|
||
writer.writerow([
|
||
r["drucksache"], bl, wp or "", r["datum"], canonical,
|
||
f"{r['gwoe_score']:.2f}",
|
||
])
|
||
return buf.getvalue()
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 4. Stimmverhalten × Gemeinwohl-Orientierung (#106 + #145 Folge)
|
||
#
|
||
# JOIN ueber assessments.gwoe_score × plenum_vote_results.fraktionen_ja|_nein|
|
||
# _enthaltung. Vier Sichten:
|
||
# - aggregate_stimm_index: pro Fraktion JA-Mean minus NEIN-Mean von gwoe_score
|
||
# - aggregate_heuchelei: pro Fraktion % der Antraege mit
|
||
# wahlprogramm_score>=7 wo Vote=NEIN
|
||
# - aggregate_stimm_index_pro_wert: stimm_index pro (Fraktion, GWOe-Wert)
|
||
# aus gwoe_matrix-Spalten
|
||
# - aggregate_stimm_index_cross_bl: stimm_index pro (Fraktion, BL)
|
||
#
|
||
# Sparse-Data-Realitaet: 35 Assessments × 7000 Votes → meiste Anträge mit Vote
|
||
# haben kein Assessment. min_n-Cutoff filtert Fraktionen mit zu wenig Daten.
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
# Spalten der GWOe-Matrix: {field-suffix → Wert-Name}. Die Field-IDs in der DB
|
||
# sind ``A1..E5`` mit Reihe = Beruehrungsgruppe (A-E), Spalte = Wert (1-5).
|
||
GWOE_WERTE = {
|
||
"1": "Menschenwürde",
|
||
"2": "Solidarität",
|
||
"3": "Ökologische Nachhaltigkeit",
|
||
"4": "Soziale Gerechtigkeit",
|
||
"5": "Transparenz & Demokratie",
|
||
}
|
||
|
||
# Zeilen der GWOe-Matrix: {field-prefix → Beruehrungsgruppe-Label}. Aus
|
||
# app/models.py:MATRIX_LABELS uebernommen, gekuerzt fuer UI-Spalten.
|
||
GWOE_BERUEHRUNGSGRUPPEN = {
|
||
"A": "Ausgelagerte Betriebe / Lieferant:innen",
|
||
"B": "Finanzpartner:innen / Steuerzahler:innen",
|
||
"C": "Politische Führung / Verwaltung / Ehrenamt",
|
||
"D": "Bürger:innen und Wirtschaft",
|
||
"E": "Staat, Gesellschaft und Natur",
|
||
}
|
||
|
||
|
||
def _load_assessments_with_votes(
|
||
filter_bl: Optional[str] = None,
|
||
filter_wp: Optional[str] = None,
|
||
db_path: Optional[Path] = None,
|
||
) -> list[dict]:
|
||
"""JOIN assessments × plenum_vote_results auf (bundesland, drucksache).
|
||
|
||
Liefert pro Match-Zeile alle relevanten Felder fuer die Stimmverhalten-
|
||
Aggregationen — nur Assessments mit gwoe_score und Vote-Result.
|
||
Mehrfach-Votes pro Drucksache (Compound-PK ueber quelle_protokoll)
|
||
erzeugen entsprechend mehrere Zeilen.
|
||
"""
|
||
path = db_path or settings.db_path
|
||
if not Path(path).exists():
|
||
return []
|
||
conn = sqlite3.connect(str(path))
|
||
try:
|
||
conn.row_factory = sqlite3.Row
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT a.drucksache, a.bundesland, a.datum,
|
||
a.fraktionen, a.gwoe_score, a.gwoe_matrix,
|
||
a.gwoe_schwerpunkt, a.wahlprogramm_scores,
|
||
p.fraktionen_ja, p.fraktionen_nein, p.fraktionen_enthaltung,
|
||
p.quelle_protokoll
|
||
FROM assessments a
|
||
INNER JOIN plenum_vote_results p
|
||
ON a.bundesland = p.bundesland
|
||
AND a.drucksache = p.drucksache
|
||
WHERE a.gwoe_score IS NOT NULL
|
||
"""
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
out: list[dict] = []
|
||
for r in rows:
|
||
bl = r["bundesland"] or ""
|
||
if filter_bl is not None and bl != filter_bl:
|
||
continue
|
||
if filter_wp is not None:
|
||
wp = wahlperiode_for(r["datum"], bl)
|
||
if wp != filter_wp:
|
||
continue
|
||
|
||
def _parse_json_list(raw):
|
||
try:
|
||
return json.loads(raw) if raw else []
|
||
except (json.JSONDecodeError, TypeError):
|
||
return []
|
||
|
||
def _norm_set(raw_list):
|
||
"""Normalisiere eine Fraktionsliste auf kanonische Namen."""
|
||
return {
|
||
normalize_partei(p, bundesland=bl) or p
|
||
for p in raw_list if p
|
||
}
|
||
|
||
antragsteller = _norm_set(_parse_json_list(r["fraktionen"]))
|
||
ja = _norm_set(_parse_json_list(r["fraktionen_ja"]))
|
||
nein = _norm_set(_parse_json_list(r["fraktionen_nein"]))
|
||
enth = _norm_set(_parse_json_list(r["fraktionen_enthaltung"]))
|
||
|
||
out.append({
|
||
"drucksache": r["drucksache"],
|
||
"bundesland": bl,
|
||
"datum": r["datum"] or "",
|
||
"gwoe_score": r["gwoe_score"],
|
||
"gwoe_matrix": _parse_json_list(r["gwoe_matrix"]),
|
||
"gwoe_schwerpunkt": _parse_json_list(r["gwoe_schwerpunkt"]),
|
||
"wahlprogramm_scores": _parse_json_list(r["wahlprogramm_scores"]),
|
||
"antragsteller": antragsteller,
|
||
"ja": ja,
|
||
"nein": nein,
|
||
"enthaltung": enth,
|
||
"quelle_protokoll": r["quelle_protokoll"],
|
||
})
|
||
return out
|
||
|
||
|
||
def _avg(values: list[float]) -> Optional[float]:
|
||
return round(sum(values) / len(values), 2) if values else None
|
||
|
||
|
||
def aggregate_stimm_index(
|
||
filter_bl: Optional[str] = None,
|
||
filter_wp: Optional[str] = None,
|
||
exclude_antragsteller: bool = True,
|
||
min_n: int = 5,
|
||
db_path: Optional[Path] = None,
|
||
) -> dict:
|
||
"""Pro Fraktion: Ø-GWÖ-Score der JA-Antraege minus Ø-GWÖ-Score der
|
||
NEIN-Antraege. Antragsteller-Bias optional rausgerechnet.
|
||
|
||
``stimm_index`` ist None wenn n_ja oder n_nein = 0; ``ausreichend``
|
||
ist True wenn n_ja >= min_n und n_nein >= min_n (Domain-Heuristik:
|
||
Aussage erst belastbar bei beidseitigem Mindest-N).
|
||
"""
|
||
rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path)
|
||
|
||
ja_scores: defaultdict[str, list[float]] = defaultdict(list)
|
||
nein_scores: defaultdict[str, list[float]] = defaultdict(list)
|
||
enth_scores: defaultdict[str, list[float]] = defaultdict(list)
|
||
|
||
for row in rows:
|
||
score = row["gwoe_score"]
|
||
skip = row["antragsteller"] if exclude_antragsteller else set()
|
||
for f in row["ja"] - skip:
|
||
ja_scores[f].append(score)
|
||
for f in row["nein"] - skip:
|
||
nein_scores[f].append(score)
|
||
for f in row["enthaltung"] - skip:
|
||
enth_scores[f].append(score)
|
||
|
||
parteien = sorted(set(ja_scores) | set(nein_scores) | set(enth_scores))
|
||
fraktionen_out = []
|
||
for p in parteien:
|
||
n_ja = len(ja_scores[p])
|
||
n_nein = len(nein_scores[p])
|
||
n_enth = len(enth_scores[p])
|
||
avg_ja = _avg(ja_scores[p])
|
||
avg_nein = _avg(nein_scores[p])
|
||
idx = (round(avg_ja - avg_nein, 2)
|
||
if avg_ja is not None and avg_nein is not None else None)
|
||
fraktionen_out.append({
|
||
"partei": p,
|
||
"n_ja": n_ja,
|
||
"n_nein": n_nein,
|
||
"n_enth": n_enth,
|
||
"avg_gwoe_ja": avg_ja,
|
||
"avg_gwoe_nein": avg_nein,
|
||
"avg_gwoe_enth": _avg(enth_scores[p]),
|
||
"stimm_index": idx,
|
||
"ausreichend": n_ja >= min_n and n_nein >= min_n,
|
||
})
|
||
|
||
fraktionen_out.sort(
|
||
key=lambda f: (f["stimm_index"] if f["stimm_index"] is not None else -999),
|
||
reverse=True,
|
||
)
|
||
return {
|
||
"fraktionen": fraktionen_out,
|
||
"n_assessments_matched": len({r["drucksache"] for r in rows}),
|
||
"n_votes_matched": len(rows),
|
||
"filter": {
|
||
"bundesland": filter_bl,
|
||
"wahlperiode": filter_wp,
|
||
"exclude_antragsteller": exclude_antragsteller,
|
||
"min_n": min_n,
|
||
},
|
||
}
|
||
|
||
|
||
def aggregate_heuchelei(
|
||
filter_bl: Optional[str] = None,
|
||
filter_wp: Optional[str] = None,
|
||
score_threshold: float = 7.0,
|
||
min_n: int = 5,
|
||
db_path: Optional[Path] = None,
|
||
) -> dict:
|
||
"""Pro Fraktion: Anteil der Antraege mit wahlprogramm_score >= 7
|
||
(Antrag passt zum Wahlprogramm), bei denen die Fraktion trotzdem
|
||
NEIN gestimmt hat.
|
||
|
||
Misst Inkonsistenz zwischen Wahlversprechen und Stimmverhalten.
|
||
"""
|
||
rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path)
|
||
|
||
n_passt: defaultdict[str, int] = defaultdict(int)
|
||
n_passt_nein: defaultdict[str, int] = defaultdict(int)
|
||
n_passt_ja: defaultdict[str, int] = defaultdict(int)
|
||
n_passt_enth: defaultdict[str, int] = defaultdict(int)
|
||
|
||
for row in rows:
|
||
wp_scores = row["wahlprogramm_scores"] or []
|
||
for entry in wp_scores:
|
||
if not isinstance(entry, dict):
|
||
continue
|
||
raw_partei = entry.get("fraktion") or ""
|
||
partei = normalize_partei(raw_partei, bundesland=row["bundesland"]) or raw_partei
|
||
if not partei:
|
||
continue
|
||
wp_block = entry.get("wahlprogramm") or {}
|
||
score = wp_block.get("score")
|
||
if score is None or score < score_threshold:
|
||
continue
|
||
n_passt[partei] += 1
|
||
if partei in row["nein"]:
|
||
n_passt_nein[partei] += 1
|
||
elif partei in row["ja"]:
|
||
n_passt_ja[partei] += 1
|
||
elif partei in row["enthaltung"]:
|
||
n_passt_enth[partei] += 1
|
||
|
||
fraktionen_out = []
|
||
for partei in sorted(n_passt):
|
||
total = n_passt[partei]
|
||
nein = n_passt_nein[partei]
|
||
ja = n_passt_ja[partei]
|
||
enth = n_passt_enth[partei]
|
||
quote = round(nein / total, 3) if total else None
|
||
fraktionen_out.append({
|
||
"partei": partei,
|
||
"n_im_programm": total,
|
||
"n_nein_trotz_programm": nein,
|
||
"n_ja_passt": ja,
|
||
"n_enth_passt": enth,
|
||
"heuchelei_quote": quote,
|
||
"ausreichend": total >= min_n,
|
||
})
|
||
fraktionen_out.sort(
|
||
key=lambda f: (f["heuchelei_quote"] or 0), reverse=True,
|
||
)
|
||
return {
|
||
"fraktionen": fraktionen_out,
|
||
"n_assessments_matched": len({r["drucksache"] for r in rows}),
|
||
"filter": {
|
||
"bundesland": filter_bl,
|
||
"wahlperiode": filter_wp,
|
||
"score_threshold": score_threshold,
|
||
"min_n": min_n,
|
||
},
|
||
}
|
||
|
||
|
||
def _wert_score_for_assessment(matrix: list[dict]) -> dict[str, float]:
|
||
"""Aus der gwoe_matrix-Liste den Mittelwert pro Wert-Spalte (1..5)
|
||
berechnen.
|
||
|
||
matrix-Eintraege haben ``field`` wie ``A1, B3, E5`` und ``rating`` -5..+5.
|
||
Pro Spalten-Suffix wird Ø-rating berechnet — fehlende Felder werden
|
||
ignoriert (LLM lässt nicht-relevante Zellen oft weg).
|
||
"""
|
||
by_col: defaultdict[str, list[float]] = defaultdict(list)
|
||
for entry in matrix or []:
|
||
if not isinstance(entry, dict):
|
||
continue
|
||
field = entry.get("field") or ""
|
||
rating = entry.get("rating")
|
||
if not field or rating is None:
|
||
continue
|
||
if len(field) >= 2 and field[-1] in GWOE_WERTE:
|
||
by_col[field[-1]].append(float(rating))
|
||
return {col: sum(vals) / len(vals) for col, vals in by_col.items()}
|
||
|
||
|
||
def _gruppen_score_for_assessment(matrix: list[dict]) -> dict[str, float]:
|
||
"""Mittelwert pro Beruehrungsgruppe-Zeile (A..E). Field-Prefix ist die
|
||
Zeile (`field[0]`), z.B. A1..A5 alle in Gruppe A."""
|
||
by_row: defaultdict[str, list[float]] = defaultdict(list)
|
||
for entry in matrix or []:
|
||
if not isinstance(entry, dict):
|
||
continue
|
||
field = entry.get("field") or ""
|
||
rating = entry.get("rating")
|
||
if not field or rating is None:
|
||
continue
|
||
if len(field) >= 2 and field[0] in GWOE_BERUEHRUNGSGRUPPEN:
|
||
by_row[field[0]].append(float(rating))
|
||
return {row: sum(vals) / len(vals) for row, vals in by_row.items()}
|
||
|
||
|
||
def aggregate_stimm_index_pro_wert(
|
||
filter_bl: Optional[str] = None,
|
||
filter_wp: Optional[str] = None,
|
||
exclude_antragsteller: bool = True,
|
||
min_n: int = 5,
|
||
db_path: Optional[Path] = None,
|
||
) -> dict:
|
||
"""Pro (Fraktion, GWÖ-Wert) ein Stimm-Index-Analog: Ø-Wert-Score der
|
||
JA-Antraege minus Ø-Wert-Score der NEIN-Antraege.
|
||
|
||
Wert-Score eines Antrags = Ø(rating der gwoe_matrix-Felder mit dem
|
||
entsprechenden Spalten-Suffix). Domain: -5..+5 pro Wert.
|
||
"""
|
||
rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path)
|
||
|
||
werte_namen = list(GWOE_WERTE.values()) # in Reihenfolge 1..5
|
||
werte_keys = list(GWOE_WERTE.keys())
|
||
|
||
# Pro (partei, wert_key) → list[wert_score] fuer JA / NEIN
|
||
ja: defaultdict[tuple[str, str], list[float]] = defaultdict(list)
|
||
nein: defaultdict[tuple[str, str], list[float]] = defaultdict(list)
|
||
|
||
parteien_seen: set[str] = set()
|
||
|
||
for row in rows:
|
||
wert_scores = _wert_score_for_assessment(row["gwoe_matrix"])
|
||
if not wert_scores:
|
||
continue
|
||
skip = row["antragsteller"] if exclude_antragsteller else set()
|
||
for f in row["ja"] - skip:
|
||
parteien_seen.add(f)
|
||
for col, sc in wert_scores.items():
|
||
ja[(f, col)].append(sc)
|
||
for f in row["nein"] - skip:
|
||
parteien_seen.add(f)
|
||
for col, sc in wert_scores.items():
|
||
nein[(f, col)].append(sc)
|
||
|
||
parteien = sorted(parteien_seen)
|
||
cells: dict[str, dict[str, dict]] = {}
|
||
for p in parteien:
|
||
cells[p] = {}
|
||
for col, wert_name in zip(werte_keys, werte_namen):
|
||
n_ja = len(ja[(p, col)])
|
||
n_nein = len(nein[(p, col)])
|
||
avg_ja = _avg(ja[(p, col)])
|
||
avg_nein = _avg(nein[(p, col)])
|
||
idx = (round(avg_ja - avg_nein, 2)
|
||
if avg_ja is not None and avg_nein is not None else None)
|
||
cells[p][wert_name] = {
|
||
"stimm_index": idx,
|
||
"n_ja": n_ja,
|
||
"n_nein": n_nein,
|
||
"ausreichend": n_ja >= min_n and n_nein >= min_n,
|
||
}
|
||
|
||
return {
|
||
"fraktionen": parteien,
|
||
"werte": werte_namen,
|
||
"cells": cells,
|
||
"n_assessments_matched": len({r["drucksache"] for r in rows}),
|
||
"filter": {
|
||
"bundesland": filter_bl,
|
||
"wahlperiode": filter_wp,
|
||
"exclude_antragsteller": exclude_antragsteller,
|
||
"min_n": min_n,
|
||
},
|
||
}
|
||
|
||
|
||
def aggregate_stimm_index_pro_gruppe(
|
||
filter_bl: Optional[str] = None,
|
||
filter_wp: Optional[str] = None,
|
||
exclude_antragsteller: bool = True,
|
||
min_n: int = 5,
|
||
db_path: Optional[Path] = None,
|
||
) -> dict:
|
||
"""Pro (Fraktion, Beruehrungsgruppe A-E) ein Stimm-Index analog zu
|
||
pro_wert, aber gegen den Gruppen-Score statt den Wert-Score (#166).
|
||
|
||
Gruppen-Score eines Antrags = Ø(rating der gwoe_matrix-Felder mit
|
||
dem entsprechenden Zeilen-Prefix). Domain: -5..+5 pro Gruppe.
|
||
"""
|
||
rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path)
|
||
|
||
gruppen_namen = list(GWOE_BERUEHRUNGSGRUPPEN.values())
|
||
gruppen_keys = list(GWOE_BERUEHRUNGSGRUPPEN.keys())
|
||
|
||
ja: defaultdict[tuple[str, str], list[float]] = defaultdict(list)
|
||
nein: defaultdict[tuple[str, str], list[float]] = defaultdict(list)
|
||
parteien_seen: set[str] = set()
|
||
|
||
for row in rows:
|
||
gruppen_scores = _gruppen_score_for_assessment(row["gwoe_matrix"])
|
||
if not gruppen_scores:
|
||
continue
|
||
skip = row["antragsteller"] if exclude_antragsteller else set()
|
||
for f in row["ja"] - skip:
|
||
parteien_seen.add(f)
|
||
for row_key, sc in gruppen_scores.items():
|
||
ja[(f, row_key)].append(sc)
|
||
for f in row["nein"] - skip:
|
||
parteien_seen.add(f)
|
||
for row_key, sc in gruppen_scores.items():
|
||
nein[(f, row_key)].append(sc)
|
||
|
||
parteien = sorted(parteien_seen)
|
||
cells: dict[str, dict[str, dict]] = {}
|
||
for p in parteien:
|
||
cells[p] = {}
|
||
for row_key, gruppen_name in zip(gruppen_keys, gruppen_namen):
|
||
n_ja = len(ja[(p, row_key)])
|
||
n_nein = len(nein[(p, row_key)])
|
||
avg_ja = _avg(ja[(p, row_key)])
|
||
avg_nein = _avg(nein[(p, row_key)])
|
||
idx = (round(avg_ja - avg_nein, 2)
|
||
if avg_ja is not None and avg_nein is not None else None)
|
||
cells[p][gruppen_name] = {
|
||
"stimm_index": idx,
|
||
"n_ja": n_ja,
|
||
"n_nein": n_nein,
|
||
"ausreichend": n_ja >= min_n and n_nein >= min_n,
|
||
}
|
||
|
||
return {
|
||
"fraktionen": parteien,
|
||
"gruppen": gruppen_namen,
|
||
"cells": cells,
|
||
"n_assessments_matched": len({r["drucksache"] for r in rows}),
|
||
"filter": {
|
||
"bundesland": filter_bl,
|
||
"wahlperiode": filter_wp,
|
||
"exclude_antragsteller": exclude_antragsteller,
|
||
"min_n": min_n,
|
||
},
|
||
}
|
||
|
||
|
||
def aggregate_empfehlungs_konsistenz(
|
||
filter_bl: Optional[str] = None,
|
||
filter_wp: Optional[str] = None,
|
||
min_n: int = 5,
|
||
db_path: Optional[Path] = None,
|
||
) -> dict:
|
||
"""Pro Fraktion: Anteil der Antraege mit GWÖ-Empfehlung "Uneingeschraenkt
|
||
unterstuetzen" oder "Unterstuetzen mit Aenderungen", bei denen die
|
||
Fraktion trotzdem NEIN gestimmt hat.
|
||
|
||
Orthogonal zu Heuchelei-Score: prueft NICHT gegen Wahlprogramm-Treue,
|
||
sondern gegen die GWÖ-Empfehlung des Systems. Misst Inkonsistenz
|
||
zwischen "GWÖ haelt Antrag fuer gut" und "Fraktion stimmt dagegen".
|
||
"""
|
||
rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path)
|
||
# Empfehlung ist im JOIN-Helper noch nicht — eigener Lookup pro Drucksache.
|
||
# Statt Helper umzubauen: zweite Query auf assessments fuer empfehlung.
|
||
path = db_path or settings.db_path
|
||
if not Path(path).exists():
|
||
return {"fraktionen": [], "n_assessments_matched": 0, "filter": {
|
||
"bundesland": filter_bl, "wahlperiode": filter_wp, "min_n": min_n,
|
||
}}
|
||
conn = sqlite3.connect(str(path))
|
||
try:
|
||
empfehlung_map = {
|
||
(r[0], r[1]): r[2] for r in conn.execute(
|
||
"SELECT bundesland, drucksache, empfehlung FROM assessments"
|
||
).fetchall()
|
||
}
|
||
finally:
|
||
conn.close()
|
||
|
||
POSITIV = {"Uneingeschränkt unterstützen", "Unterstützen mit Änderungen"}
|
||
|
||
n_empfohlen: defaultdict[str, int] = defaultdict(int)
|
||
n_nein: defaultdict[str, int] = defaultdict(int)
|
||
n_ja: defaultdict[str, int] = defaultdict(int)
|
||
n_enth: defaultdict[str, int] = defaultdict(int)
|
||
seen_drucksachen = set()
|
||
|
||
for row in rows:
|
||
empfehlung = empfehlung_map.get((row["bundesland"], row["drucksache"]))
|
||
if empfehlung not in POSITIV:
|
||
continue
|
||
seen_drucksachen.add((row["bundesland"], row["drucksache"]))
|
||
all_voters = row["ja"] | row["nein"] | row["enthaltung"]
|
||
for f in all_voters:
|
||
n_empfohlen[f] += 1
|
||
if f in row["nein"]:
|
||
n_nein[f] += 1
|
||
elif f in row["ja"]:
|
||
n_ja[f] += 1
|
||
elif f in row["enthaltung"]:
|
||
n_enth[f] += 1
|
||
|
||
fraktionen_out = []
|
||
for partei in sorted(n_empfohlen):
|
||
total = n_empfohlen[partei]
|
||
nein = n_nein[partei]
|
||
quote = round(nein / total, 3) if total else None
|
||
fraktionen_out.append({
|
||
"partei": partei,
|
||
"n_empfohlen": total,
|
||
"n_nein_trotz_empfehlung": nein,
|
||
"n_ja": n_ja[partei],
|
||
"n_enth": n_enth[partei],
|
||
"konsistenz_quote": quote,
|
||
"ausreichend": total >= min_n,
|
||
})
|
||
fraktionen_out.sort(
|
||
key=lambda f: (f["konsistenz_quote"] or 0), reverse=True,
|
||
)
|
||
return {
|
||
"fraktionen": fraktionen_out,
|
||
"n_assessments_matched": len(seen_drucksachen),
|
||
"filter": {
|
||
"bundesland": filter_bl,
|
||
"wahlperiode": filter_wp,
|
||
"min_n": min_n,
|
||
},
|
||
}
|
||
|
||
|
||
def aggregate_stimm_index_cross_bl(
|
||
filter_wp: Optional[str] = None,
|
||
exclude_antragsteller: bool = True,
|
||
min_n: int = 5,
|
||
db_path: Optional[Path] = None,
|
||
) -> dict:
|
||
"""Stimm-Index pro (Fraktion, Bundesland) — macht regionale Drift
|
||
sichtbar fuer bundesweit aktive Fraktionen (CDU, SPD, GRÜNE, AfD,
|
||
FDP, LINKE, BSW)."""
|
||
rows = _load_assessments_with_votes(None, filter_wp, db_path)
|
||
|
||
ja: defaultdict[tuple[str, str], list[float]] = defaultdict(list)
|
||
nein: defaultdict[tuple[str, str], list[float]] = defaultdict(list)
|
||
bl_seen: set[str] = set()
|
||
parteien_seen: set[str] = set()
|
||
|
||
for row in rows:
|
||
bl = row["bundesland"]
|
||
if not bl:
|
||
continue
|
||
bl_seen.add(bl)
|
||
score = row["gwoe_score"]
|
||
skip = row["antragsteller"] if exclude_antragsteller else set()
|
||
for f in row["ja"] - skip:
|
||
parteien_seen.add(f)
|
||
ja[(f, bl)].append(score)
|
||
for f in row["nein"] - skip:
|
||
parteien_seen.add(f)
|
||
nein[(f, bl)].append(score)
|
||
|
||
bundeslaender = sorted(bl_seen)
|
||
parteien = sorted(parteien_seen)
|
||
cells: dict[str, dict[str, dict]] = {}
|
||
for p in parteien:
|
||
cells[p] = {}
|
||
for bl in bundeslaender:
|
||
n_ja = len(ja[(p, bl)])
|
||
n_nein = len(nein[(p, bl)])
|
||
avg_ja = _avg(ja[(p, bl)])
|
||
avg_nein = _avg(nein[(p, bl)])
|
||
idx = (round(avg_ja - avg_nein, 2)
|
||
if avg_ja is not None and avg_nein is not None else None)
|
||
cells[p][bl] = {
|
||
"stimm_index": idx,
|
||
"n_ja": n_ja,
|
||
"n_nein": n_nein,
|
||
"ausreichend": n_ja >= min_n and n_nein >= min_n,
|
||
}
|
||
|
||
# Filter Fraktionen mit nur 1 BL → in Phase 1 nicht aufschlussreich
|
||
parteien_multi_bl = [
|
||
p for p in parteien
|
||
if sum(1 for bl in bundeslaender if cells[p][bl]["ausreichend"]) >= 2
|
||
]
|
||
|
||
return {
|
||
"fraktionen": parteien_multi_bl,
|
||
"fraktionen_alle": parteien,
|
||
"bundeslaender": bundeslaender,
|
||
"cells": cells,
|
||
"n_assessments_matched": len({r["drucksache"] for r in rows}),
|
||
"filter": {
|
||
"wahlperiode": filter_wp,
|
||
"exclude_antragsteller": exclude_antragsteller,
|
||
"min_n": min_n,
|
||
},
|
||
}
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 5. CSV-Export der Stimmverhalten-Aggregationen
|
||
#
|
||
# Long-Format-CSV pro Aggregation, analog zu export_long_format(). Macht die
|
||
# Aussagen wissenschaftlich auswertbar (R/pandas/Excel) ohne JSON-Parsing.
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def export_stimmverhalten_csv(
|
||
filter_bl: Optional[str] = None,
|
||
filter_wp: Optional[str] = None,
|
||
exclude_antragsteller: bool = True,
|
||
db_path: Optional[Path] = None,
|
||
) -> str:
|
||
"""Long-Format-CSV: Eine Zeile pro (drucksache, partei, vote).
|
||
|
||
Spalten: drucksache, bundesland, wahlperiode, datum, gwoe_score,
|
||
empfehlung, partei, vote (ja|nein|enthaltung), ist_antragsteller.
|
||
|
||
Eine Zeile pro Fraktion-Stimme — wer also an N Anträgen mit Vote
|
||
teilgenommen hat, hat N Zeilen.
|
||
"""
|
||
rows = _load_assessments_with_votes(filter_bl, filter_wp, db_path)
|
||
# Empfehlung-Map analog aggregate_empfehlungs_konsistenz
|
||
path = db_path or settings.db_path
|
||
empfehlung_map: dict[tuple[str, str], str] = {}
|
||
if Path(path).exists():
|
||
conn = sqlite3.connect(str(path))
|
||
try:
|
||
empfehlung_map = {
|
||
(r[0], r[1]): r[2] or "" for r in conn.execute(
|
||
"SELECT bundesland, drucksache, empfehlung FROM assessments"
|
||
).fetchall()
|
||
}
|
||
finally:
|
||
conn.close()
|
||
|
||
buf = io.StringIO()
|
||
writer = csv.writer(buf, dialect="excel")
|
||
writer.writerow([
|
||
"drucksache", "bundesland", "wahlperiode", "datum",
|
||
"gwoe_score", "empfehlung", "partei", "vote", "ist_antragsteller",
|
||
])
|
||
|
||
for row in rows:
|
||
bl = row["bundesland"]
|
||
wp = wahlperiode_for(row["datum"], bl) if bl else ""
|
||
empfehlung = empfehlung_map.get((bl, row["drucksache"]), "")
|
||
antragsteller = row["antragsteller"]
|
||
for vote_key, voters in [
|
||
("ja", row["ja"]),
|
||
("nein", row["nein"]),
|
||
("enthaltung", row["enthaltung"]),
|
||
]:
|
||
for partei in sorted(voters):
|
||
if exclude_antragsteller and partei in antragsteller:
|
||
continue
|
||
writer.writerow([
|
||
row["drucksache"], bl, wp or "", row["datum"],
|
||
f"{row['gwoe_score']:.2f}",
|
||
empfehlung,
|
||
partei, vote_key,
|
||
"1" if partei in antragsteller else "0",
|
||
])
|
||
return buf.getvalue()
|