diff --git a/app/database.py b/app/database.py index 35fccfc..651039a 100644 --- a/app/database.py +++ b/app/database.py @@ -63,12 +63,38 @@ async def init_db(): ) """) - # Migration: drucksache-Spalte zu jobs (für Queue-Resume nach Restart) + # Migrations cursor = await db.execute("PRAGMA table_info(jobs)") - cols = {r[1] for r in await cursor.fetchall()} - if "drucksache" not in cols: + job_cols = {r[1] for r in await cursor.fetchall()} + if "drucksache" not in job_cols: await db.execute("ALTER TABLE jobs ADD COLUMN drucksache TEXT") + cursor = await db.execute("PRAGMA table_info(assessments)") + ass_cols = {r[1] for r in await cursor.fetchall()} + if "konfidenz" not in ass_cols: + await db.execute("ALTER TABLE assessments ADD COLUMN konfidenz TEXT") + # #123 Embedding-Migration: Assessment-Zusammenfassungen bekommen + # eigene Embeddings für Clustering (#105) und Ähnlichkeitssuche (#108). + if "summary_embedding" not in ass_cols: + await db.execute("ALTER TABLE assessments ADD COLUMN summary_embedding BLOB") + if "embedding_model" not in ass_cols: + await db.execute("ALTER TABLE assessments ADD COLUMN embedding_model TEXT") + # #127: Drucksache-Typ (Original vom Landtag + normiert) + if "typ" not in ass_cols: + await db.execute("ALTER TABLE assessments ADD COLUMN typ TEXT") + if "typ_normiert" not in ass_cols: + await db.execute("ALTER TABLE assessments ADD COLUMN typ_normiert TEXT") + # #133: Social-Media-Texte pro Antrag (vom LLM generiert) + if "share_threads" not in ass_cols: + await db.execute("ALTER TABLE assessments ADD COLUMN share_threads TEXT") + if "share_twitter" not in ass_cols: + await db.execute("ALTER TABLE assessments ADD COLUMN share_twitter TEXT") + if "share_mastodon" not in ass_cols: + await db.execute("ALTER TABLE assessments ADD COLUMN share_mastodon TEXT") + # #128: Fraktionen ohne hinterlegtes Wahlprogramm (JSON-Array) + if "fehlende_programme" not in ass_cols: + await db.execute("ALTER TABLE assessments ADD COLUMN fehlende_programme TEXT") + # Bookmarks (#94) await db.execute(""" CREATE TABLE IF NOT EXISTS bookmarks ( @@ -79,6 +105,20 @@ async def init_db(): ) """) + # Merkliste — serverseitig persistent (#140) + await db.execute(""" + CREATE TABLE IF NOT EXISTS merkliste ( + user_id TEXT NOT NULL, + antrag_id TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + notiz TEXT, + PRIMARY KEY (user_id, antrag_id) + ) + """) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_merkliste_user ON merkliste(user_id)" + ) + # Kommentare (#94) await db.execute(""" CREATE TABLE IF NOT EXISTS comments ( @@ -95,6 +135,130 @@ async def init_db(): "CREATE INDEX IF NOT EXISTS idx_comments_drucksache ON comments(drucksache)" ) + # Votes / Crowd-Validation (#112) + await db.execute(""" + CREATE TABLE IF NOT EXISTS votes ( + user_id TEXT NOT NULL, + drucksache TEXT NOT NULL, + target TEXT NOT NULL DEFAULT 'overall', + vote TEXT NOT NULL CHECK(vote IN ('up', 'down')), + created_at TEXT NOT NULL DEFAULT (datetime('now')), + PRIMARY KEY (user_id, drucksache, target) + ) + """) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_votes_drucksache ON votes(drucksache)" + ) + + # Assessment-Versionshistorie (#110) + await db.execute(""" + CREATE TABLE IF NOT EXISTS assessment_versions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + drucksache TEXT NOT NULL, + version INTEGER NOT NULL, + gwoe_score REAL, + model TEXT, + snapshot TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + """) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_versions_drucksache ON assessment_versions(drucksache)" + ) + + # E-Mail-Abonnements (#124) + # bundesland/partei NULL = beides "alle". frequency = 'daily' (weitere + # später). last_sent als ISO-Timestamp, initial leer = sofort beim + # ersten Digest-Lauf eligible. + await db.execute(""" + CREATE TABLE IF NOT EXISTS email_subscriptions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL, + email TEXT NOT NULL, + bundesland TEXT, + partei TEXT, + frequency TEXT NOT NULL DEFAULT 'daily', + last_sent TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + """) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_subs_user ON email_subscriptions(user_id)" + ) + + # Monitoring-Tabellen (#135) + await db.execute(""" + CREATE TABLE IF NOT EXISTS monitoring_scans ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + bundesland TEXT NOT NULL, + drucksache TEXT NOT NULL, + title TEXT, + datum TEXT, + typ TEXT, + typ_normiert TEXT, + fraktionen TEXT, -- JSON array + link TEXT, + is_assessed INTEGER NOT NULL DEFAULT 0, -- JOIN-Flag, aktuell nicht im Report + seen_first_at TEXT NOT NULL, + last_seen_at TEXT NOT NULL, + UNIQUE(bundesland, drucksache) + ) + """) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_monitoring_scans_bl ON monitoring_scans(bundesland)" + ) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_monitoring_scans_first ON monitoring_scans(seen_first_at)" + ) + + await db.execute(""" + CREATE TABLE IF NOT EXISTS monitoring_daily_summary ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + scan_date TEXT NOT NULL, -- ISO-Datum YYYY-MM-DD + bundesland TEXT NOT NULL, + total_seen INTEGER NOT NULL DEFAULT 0, + new_count INTEGER NOT NULL DEFAULT 0, + errors TEXT, -- Adapter-Fehlermeldungen (NULL = kein Fehler) + created_at TEXT NOT NULL DEFAULT (datetime('now')), + UNIQUE(scan_date, bundesland) + ) + """) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_monitoring_summary_date ON monitoring_daily_summary(scan_date)" + ) + + # abgeordnetenwatch-Abstimmungsdaten (#106 Phase 1) + await db.execute(""" + CREATE TABLE IF NOT EXISTS abgeordnetenwatch_polls ( + poll_id INTEGER PRIMARY KEY, + parliament_id INTEGER NOT NULL, + bundesland TEXT NOT NULL, + drucksache TEXT, + titel TEXT, + datum DATE, + accepted BOOLEAN, + topics TEXT, + legislature_label TEXT, + synced_at TIMESTAMP + ) + """) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_aw_polls_ds " + "ON abgeordnetenwatch_polls(drucksache)" + ) + + await db.execute(""" + CREATE TABLE IF NOT EXISTS abgeordnetenwatch_votes ( + poll_id INTEGER NOT NULL, + politician_id INTEGER NOT NULL, + politician_name TEXT, + partei TEXT, + vote TEXT, + PRIMARY KEY (poll_id, politician_id), + FOREIGN KEY (poll_id) REFERENCES abgeordnetenwatch_polls(poll_id) + ) + """) + await db.commit() @@ -134,6 +298,160 @@ async def get_bookmarks(user_id: str) -> list[str]: return [r[0] for r in await rows.fetchall()] +# ─── Merkliste-Functions (#140) ───────────────────────────────────────────── + +async def merkliste_add(user_id: str, antrag_id: str, notiz: Optional[str] = None) -> dict: + """Eintrag zur Merkliste hinzufügen (Upsert). Gibt den Eintrag zurück.""" + async with aiosqlite.connect(settings.db_path) as db: + await db.execute( + """INSERT INTO merkliste (user_id, antrag_id, notiz) + VALUES (?, ?, ?) + ON CONFLICT(user_id, antrag_id) DO UPDATE SET notiz = COALESCE(excluded.notiz, merkliste.notiz)""", + (user_id, antrag_id, notiz), + ) + await db.commit() + db.row_factory = aiosqlite.Row + row = await db.execute( + "SELECT antrag_id, notiz, created_at FROM merkliste WHERE user_id=? AND antrag_id=?", + (user_id, antrag_id), + ) + r = await row.fetchone() + return dict(r) if r else {"antrag_id": antrag_id, "notiz": notiz} + + +async def merkliste_remove(user_id: str, antrag_id: str) -> bool: + """Eintrag aus der Merkliste entfernen. Gibt True zurück wenn gelöscht.""" + async with aiosqlite.connect(settings.db_path) as db: + cur = await db.execute( + "DELETE FROM merkliste WHERE user_id=? AND antrag_id=?", + (user_id, antrag_id), + ) + await db.commit() + return cur.rowcount > 0 + + +async def merkliste_list(user_id: str) -> list[dict]: + """Alle Merklisten-Einträge eines Users, sortiert nach created_at DESC.""" + async with aiosqlite.connect(settings.db_path) as db: + db.row_factory = aiosqlite.Row + rows = await db.execute( + "SELECT antrag_id, notiz, created_at FROM merkliste WHERE user_id=? ORDER BY created_at DESC", + (user_id,), + ) + return [dict(r) for r in await rows.fetchall()] + + +async def merkliste_bulk_add(user_id: str, entries: list[dict]) -> int: + """Mehrere Einträge auf einmal hinzufügen (Upsert). Für localStorage-Migration. + + Jeder Eintrag muss ``antrag_id`` enthalten; ``notiz`` ist optional. + Gibt die Anzahl verarbeiteter Einträge zurück. + """ + async with aiosqlite.connect(settings.db_path) as db: + count = 0 + for entry in entries: + antrag_id = entry.get("antrag_id") + if not antrag_id: + continue + notiz = entry.get("notiz") + await db.execute( + """INSERT INTO merkliste (user_id, antrag_id, notiz) + VALUES (?, ?, ?) + ON CONFLICT(user_id, antrag_id) DO NOTHING""", + (user_id, antrag_id, notiz), + ) + count += 1 + await db.commit() + return count + + +# ─── Email-Subscription-Functions (#124) ──────────────────────────────────── + +async def create_subscription( + user_id: str, + email: str, + bundesland: Optional[str] = None, + partei: Optional[str] = None, + frequency: str = "daily", +) -> int: + """Neues Abo anlegen. Gibt die neue Abo-ID zurück.""" + async with aiosqlite.connect(settings.db_path) as db: + cur = await db.execute( + "INSERT INTO email_subscriptions (user_id, email, bundesland, partei, frequency) " + "VALUES (?, ?, ?, ?, ?)", + (user_id, email, bundesland, partei, frequency), + ) + await db.commit() + return cur.lastrowid + + +async def list_subscriptions(user_id: str) -> list[dict]: + """Alle Abos eines Users.""" + async with aiosqlite.connect(settings.db_path) as db: + db.row_factory = aiosqlite.Row + rows = await db.execute( + "SELECT id, email, bundesland, partei, frequency, last_sent, created_at " + "FROM email_subscriptions WHERE user_id=? ORDER BY created_at DESC", + (user_id,), + ) + return [dict(r) for r in await rows.fetchall()] + + +async def list_all_subscriptions() -> list[dict]: + """Alle Abos aller User — nur für Admin-Endpoints.""" + async with aiosqlite.connect(settings.db_path) as db: + db.row_factory = aiosqlite.Row + rows = await db.execute( + "SELECT id, user_id, email, bundesland, partei, frequency, last_sent, created_at " + "FROM email_subscriptions ORDER BY created_at DESC" + ) + return [dict(r) for r in await rows.fetchall()] + + +async def delete_subscription(user_id: str, sub_id: int) -> bool: + """Abo löschen. Prüft, dass es dem User gehört.""" + async with aiosqlite.connect(settings.db_path) as db: + cur = await db.execute( + "DELETE FROM email_subscriptions WHERE id=? AND user_id=?", + (sub_id, user_id), + ) + await db.commit() + return cur.rowcount > 0 + + +async def delete_subscription_by_id(sub_id: int) -> bool: + """Abo per ID löschen (für Unsubscribe-Token-Link, kein User-Check).""" + async with aiosqlite.connect(settings.db_path) as db: + cur = await db.execute( + "DELETE FROM email_subscriptions WHERE id=?", (sub_id,) + ) + await db.commit() + return cur.rowcount > 0 + + +async def get_all_subscriptions_due(frequency: str = "daily") -> list[dict]: + """Alle Abos der gegebenen Frequency die einen Digest brauchen + (last_sent IS NULL oder älter als 24h für daily).""" + async with aiosqlite.connect(settings.db_path) as db: + db.row_factory = aiosqlite.Row + rows = await db.execute( + "SELECT * FROM email_subscriptions " + "WHERE frequency = ? " + " AND (last_sent IS NULL OR datetime(last_sent) < datetime('now', '-23 hours'))", + (frequency,), + ) + return [dict(r) for r in await rows.fetchall()] + + +async def mark_subscription_sent(sub_id: int) -> None: + async with aiosqlite.connect(settings.db_path) as db: + await db.execute( + "UPDATE email_subscriptions SET last_sent = datetime('now') WHERE id = ?", + (sub_id,), + ) + await db.commit() + + # ─── Comment-Functions (#94) ──────────────────────────────────────────────── async def add_comment(user_id: str, user_name: str, drucksache: str, @@ -201,6 +519,79 @@ async def delete_comment(comment_id: int, user_id: str) -> bool: return cursor.rowcount > 0 +# ─── Assessment-History (#110) ────────────────────────────────────────── + + +async def get_assessment_history(drucksache: str) -> list[dict]: + """Get version history for an assessment.""" + import json as _json + async with aiosqlite.connect(settings.db_path) as db: + db.row_factory = aiosqlite.Row + rows = await db.execute( + "SELECT version, gwoe_score, model, created_at FROM assessment_versions " + "WHERE drucksache=? ORDER BY version DESC", + (drucksache,), + ) + return [dict(r) for r in await rows.fetchall()] + + +# ─── Vote-Functions (#112 Crowd-Validation) ─────────────────────────────── + + +async def upsert_vote(user_id: str, drucksache: str, target: str, vote: str) -> dict: + """Set or toggle a vote. Returns current vote state.""" + async with aiosqlite.connect(settings.db_path) as db: + existing = await db.execute( + "SELECT vote FROM votes WHERE user_id=? AND drucksache=? AND target=?", + (user_id, drucksache, target), + ) + row = await existing.fetchone() + if row and row[0] == vote: + # Same vote again → remove (toggle off) + await db.execute( + "DELETE FROM votes WHERE user_id=? AND drucksache=? AND target=?", + (user_id, drucksache, target), + ) + await db.commit() + return {"vote": None} + else: + await db.execute( + "INSERT OR REPLACE INTO votes (user_id, drucksache, target, vote) VALUES (?, ?, ?, ?)", + (user_id, drucksache, target, vote), + ) + await db.commit() + return {"vote": vote} + + +async def get_votes(drucksache: str, user_id: str = None) -> dict: + """Get aggregated votes for a drucksache + optional user's own votes.""" + async with aiosqlite.connect(settings.db_path) as db: + # Aggregated counts per target + rows = await db.execute( + "SELECT target, vote, COUNT(*) as cnt FROM votes " + "WHERE drucksache=? GROUP BY target, vote", + (drucksache,), + ) + counts = {} + for r in await rows.fetchall(): + target, vote, cnt = r + if target not in counts: + counts[target] = {"up": 0, "down": 0} + counts[target][vote] = cnt + + # User's own votes + my_votes = {} + if user_id: + rows = await db.execute( + "SELECT target, vote FROM votes WHERE drucksache=? AND user_id=?", + (drucksache, user_id), + ) + for r in await rows.fetchall(): + my_votes[r[0]] = r[1] + + return {"counts": counts, "my_votes": my_votes} + + async def create_job( job_id: str, input_preview: str, @@ -263,11 +654,34 @@ async def get_user_jobs(user_id: str, limit: int = 50) -> list[dict]: async def upsert_assessment(data: dict) -> bool: - """Insert or update an assessment.""" + """Insert or update an assessment. Archives old version if exists (#110).""" import json now = datetime.utcnow().isoformat() - + async with aiosqlite.connect(settings.db_path) as db: + # Alte Version archivieren, falls vorhanden + drucksache = data.get("drucksache") + db.row_factory = aiosqlite.Row + old = await db.execute("SELECT * FROM assessments WHERE drucksache=?", (drucksache,)) + old_row = await old.fetchone() + if old_row: + old_dict = dict(old_row) + # Version = bisherige Anzahl + 1 + ver_count = await db.execute( + "SELECT COUNT(*) FROM assessment_versions WHERE drucksache=?", (drucksache,) + ) + version = (await ver_count.fetchone())[0] + 1 + await db.execute( + "INSERT INTO assessment_versions (drucksache, version, gwoe_score, model, snapshot) " + "VALUES (?, ?, ?, ?, ?)", + (drucksache, version, old_dict.get("gwoe_score"), old_dict.get("model"), + json.dumps(old_dict, ensure_ascii=False, default=str)), + ) + db.row_factory = None + # #123: Assessment-Embedding (optional, kann None sein wenn Embedding- + # API gerade down war — wird vom Backfill-Script später nachgezogen) + summary_embedding = data.get("summary_embedding") # bytes | None + embedding_model = data.get("embedding_model") # str | None await db.execute(""" INSERT INTO assessments ( drucksache, title, fraktionen, datum, link, bundesland, @@ -275,13 +689,24 @@ async def upsert_assessment(data: dict) -> bool: wahlprogramm_scores, verbesserungen, staerken, schwaechen, empfehlung, empfehlung_symbol, verbesserungspotenzial, themen, antrag_zusammenfassung, antrag_kernpunkte, - source, model, created_at, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + source, model, konfidenz, + summary_embedding, embedding_model, + share_threads, share_twitter, share_mastodon, + fehlende_programme, + created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(drucksache) DO UPDATE SET title = excluded.title, gwoe_score = excluded.gwoe_score, gwoe_begruendung = excluded.gwoe_begruendung, gwoe_matrix = excluded.gwoe_matrix, + konfidenz = excluded.konfidenz, + summary_embedding = COALESCE(excluded.summary_embedding, assessments.summary_embedding), + embedding_model = COALESCE(excluded.embedding_model, assessments.embedding_model), + share_threads = COALESCE(excluded.share_threads, assessments.share_threads), + share_twitter = COALESCE(excluded.share_twitter, assessments.share_twitter), + share_mastodon = COALESCE(excluded.share_mastodon, assessments.share_mastodon), + fehlende_programme = excluded.fehlende_programme, updated_at = excluded.updated_at """, ( data.get("drucksache"), @@ -306,6 +731,13 @@ async def upsert_assessment(data: dict) -> bool: json.dumps(data.get("antragKernpunkte", [])), data.get("source", "webapp"), data.get("model"), + data.get("konfidenz"), + summary_embedding, + embedding_model, + data.get("share_threads"), + data.get("share_twitter"), + data.get("share_mastodon"), + json.dumps(data.get("fehlendeProgramme", [])), now, now )) await db.commit() @@ -324,9 +756,10 @@ async def get_assessment(drucksache: str) -> Optional[dict]: if row: d = dict(row) # Parse JSON fields - for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt", - "wahlprogramm_scores", "verbesserungen", "staerken", - "schwaechen", "themen", "antrag_kernpunkte"]: + for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt", + "wahlprogramm_scores", "verbesserungen", "staerken", + "schwaechen", "themen", "antrag_kernpunkte", + "fehlende_programme"]: if d.get(field): try: d[field] = json.loads(d[field]) @@ -370,9 +803,10 @@ async def get_all_assessments(bundesland: str = None) -> list[dict]: for row in rows: d = dict(row) # Parse JSON fields - for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt", - "wahlprogramm_scores", "verbesserungen", "staerken", - "schwaechen", "themen", "antrag_kernpunkte"]: + for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt", + "wahlprogramm_scores", "verbesserungen", "staerken", + "schwaechen", "themen", "antrag_kernpunkte", + "fehlende_programme"]: if d.get(field): try: d[field] = json.loads(d[field]) @@ -471,9 +905,10 @@ async def search_assessments(query: str, bundesland: str = None, limit: int = 50 results = [] for row in rows: d = dict(row) - for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt", - "wahlprogramm_scores", "verbesserungen", "staerken", - "schwaechen", "themen", "antrag_kernpunkte"]: + for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt", + "wahlprogramm_scores", "verbesserungen", "staerken", + "schwaechen", "themen", "antrag_kernpunkte", + "fehlende_programme"]: if d.get(field): try: d[field] = json.loads(d[field]) @@ -491,5 +926,258 @@ async def search_assessments(query: str, bundesland: str = None, limit: int = 50 continue results.append(d) - + return results + + +# ─── Monitoring-Functions (#135) ──────────────────────────────────────────── + +async def upsert_monitoring_scan( + bundesland: str, + drucksache: str, + title: Optional[str], + datum: Optional[str], + typ: Optional[str], + typ_normiert: Optional[str], + fraktionen: list, + link: Optional[str], + now: str, +) -> bool: + """UPSERT für einen Monitoring-Treffer. + + seen_first_at bleibt beim ersten INSERT stabil. last_seen_at wird + bei jedem Lauf aktualisiert. Gibt True zurück wenn der Eintrag neu + angelegt wurde (new), False bei Update (already known). + """ + import json as _json + async with aiosqlite.connect(settings.db_path) as db: + cur = await db.execute( + "SELECT id FROM monitoring_scans WHERE bundesland=? AND drucksache=?", + (bundesland, drucksache), + ) + existing = await cur.fetchone() + if existing: + await db.execute( + "UPDATE monitoring_scans SET last_seen_at=? WHERE bundesland=? AND drucksache=?", + (now, bundesland, drucksache), + ) + await db.commit() + return False # bereits bekannt + else: + await db.execute( + """INSERT INTO monitoring_scans + (bundesland, drucksache, title, datum, typ, typ_normiert, + fraktionen, link, seen_first_at, last_seen_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + bundesland, drucksache, title, datum, typ, typ_normiert, + _json.dumps(fraktionen or [], ensure_ascii=False), + link, now, now, + ), + ) + await db.commit() + return True # neu + + +async def upsert_monitoring_summary( + scan_date: str, + bundesland: str, + total_seen: int, + new_count: int, + errors: Optional[str], +) -> None: + """UPSERT tägliche Zusammenfassung pro Bundesland.""" + async with aiosqlite.connect(settings.db_path) as db: + await db.execute( + """INSERT INTO monitoring_daily_summary + (scan_date, bundesland, total_seen, new_count, errors) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(scan_date, bundesland) DO UPDATE SET + total_seen = excluded.total_seen, + new_count = excluded.new_count, + errors = excluded.errors""", + (scan_date, bundesland, total_seen, new_count, errors), + ) + await db.commit() + + +async def get_monitoring_summary(scan_date: str) -> list[dict]: + """Alle Zusammenfassungen für ein bestimmtes Datum.""" + async with aiosqlite.connect(settings.db_path) as db: + db.row_factory = aiosqlite.Row + rows = await db.execute( + "SELECT * FROM monitoring_daily_summary WHERE scan_date=? ORDER BY bundesland", + (scan_date,), + ) + return [dict(r) for r in await rows.fetchall()] + + + +# ─── abgeordnetenwatch-Functions (#106) ───────────────────────────────────── + +async def upsert_aw_poll( + poll_id: int, + parliament_id: int, + bundesland: str, + drucksache: Optional[str], + titel: Optional[str], + datum: Optional[str], + accepted: Optional[bool], + topics: list, + legislature_label: Optional[str], + synced_at: str, +) -> bool: + """UPSERT für einen abgeordnetenwatch-Poll. + + Gibt True zurück wenn der Eintrag neu angelegt wurde, False bei Update. + """ + import json as _json + async with aiosqlite.connect(settings.db_path) as db: + cur = await db.execute( + "SELECT poll_id FROM abgeordnetenwatch_polls WHERE poll_id=?", + (poll_id,), + ) + existing = await cur.fetchone() + if existing: + await db.execute( + """UPDATE abgeordnetenwatch_polls + SET drucksache=?, titel=?, datum=?, accepted=?, topics=?, + legislature_label=?, synced_at=? + WHERE poll_id=?""", + ( + drucksache, titel, datum, + 1 if accepted else 0, + _json.dumps(topics or [], ensure_ascii=False), + legislature_label, synced_at, poll_id, + ), + ) + await db.commit() + return False + else: + await db.execute( + """INSERT INTO abgeordnetenwatch_polls + (poll_id, parliament_id, bundesland, drucksache, titel, + datum, accepted, topics, legislature_label, synced_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + poll_id, parliament_id, bundesland, drucksache, titel, + datum, + 1 if accepted else 0, + _json.dumps(topics or [], ensure_ascii=False), + legislature_label, synced_at, + ), + ) + await db.commit() + return True + + +async def upsert_aw_vote( + poll_id: int, + politician_id: int, + politician_name: Optional[str], + partei: Optional[str], + vote: str, +) -> bool: + """UPSERT für eine Einzelstimme. + + Gibt True zurück wenn der Eintrag neu angelegt wurde, False bei Update. + """ + async with aiosqlite.connect(settings.db_path) as db: + cur = await db.execute( + "SELECT poll_id FROM abgeordnetenwatch_votes WHERE poll_id=? AND politician_id=?", + (poll_id, politician_id), + ) + existing = await cur.fetchone() + if existing: + await db.execute( + """UPDATE abgeordnetenwatch_votes + SET politician_name=?, partei=?, vote=? + WHERE poll_id=? AND politician_id=?""", + (politician_name, partei, vote, poll_id, politician_id), + ) + await db.commit() + return False + else: + await db.execute( + """INSERT INTO abgeordnetenwatch_votes + (poll_id, politician_id, politician_name, partei, vote) + VALUES (?, ?, ?, ?, ?)""", + (poll_id, politician_id, politician_name, partei, vote), + ) + await db.commit() + return True + + +async def get_abstimmungsverhalten(drucksache: str) -> Optional[dict]: + """Gibt Fraktions-Aggregate (Yes/No/Abstain pro Partei) zurück. + + Sucht in abgeordnetenwatch_polls nach der Drucksache und aggregiert + die Stimmen aus abgeordnetenwatch_votes nach Partei. + + Returns: + Dict mit ``poll_id``, ``titel``, ``datum``, ``accepted`` und + ``fraktionen`` (Liste von {partei, yes, no, abstain, no_show}). + None wenn keine Abstimmungsdaten vorliegen. + """ + async with aiosqlite.connect(settings.db_path) as db: + db.row_factory = aiosqlite.Row + cur = await db.execute( + "SELECT * FROM abgeordnetenwatch_polls WHERE drucksache=? LIMIT 1", + (drucksache,), + ) + poll_row = await cur.fetchone() + if not poll_row: + return None + poll = dict(poll_row) + + rows = await db.execute( + """SELECT partei, + SUM(CASE WHEN vote='yes' THEN 1 ELSE 0 END) AS yes, + SUM(CASE WHEN vote='no' THEN 1 ELSE 0 END) AS no, + SUM(CASE WHEN vote='abstain' THEN 1 ELSE 0 END) AS abstain, + SUM(CASE WHEN vote='no_show' THEN 1 ELSE 0 END) AS no_show + FROM abgeordnetenwatch_votes + WHERE poll_id=? + GROUP BY partei + ORDER BY (yes + no + abstain + no_show) DESC""", + (poll["poll_id"],), + ) + fraktionen = [ + { + "partei": r["partei"] or "Unbekannt", + "yes": r["yes"], + "no": r["no"], + "abstain": r["abstain"], + "no_show": r["no_show"], + } + for r in await rows.fetchall() + ] + + return { + "poll_id": poll["poll_id"], + "titel": poll["titel"], + "datum": poll["datum"], + "accepted": bool(poll["accepted"]), + "fraktionen": fraktionen, + } + + +async def get_monitoring_new_today(scan_date: str) -> list[dict]: + """Alle Einträge aus monitoring_scans, die am scan_date erstmals gesehen wurden.""" + import json as _json + async with aiosqlite.connect(settings.db_path) as db: + db.row_factory = aiosqlite.Row + rows = await db.execute( + "SELECT * FROM monitoring_scans WHERE seen_first_at LIKE ? ORDER BY bundesland, drucksache", + (f"{scan_date}%",), + ) + result = [] + for r in await rows.fetchall(): + d = dict(r) + if d.get("fraktionen"): + try: + d["fraktionen"] = _json.loads(d["fraktionen"]) + except Exception: + pass + result.append(d) + return result