"""SQLite database for job tracking.""" import aiosqlite from datetime import datetime from typing import Optional from .config import settings async def init_db(): """Initialize database with tables.""" async with aiosqlite.connect(settings.db_path) as db: await db.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, status TEXT NOT NULL DEFAULT 'queued', input_preview TEXT, bundesland TEXT DEFAULT 'NRW', model TEXT DEFAULT 'qwen-plus', result TEXT, html_path TEXT, pdf_path TEXT, error TEXT, user_id TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # Assessments table for pre-computed + new analyses await db.execute(""" CREATE TABLE IF NOT EXISTS assessments ( drucksache TEXT PRIMARY KEY, title TEXT, fraktionen TEXT, -- JSON array datum TEXT, link TEXT, bundesland TEXT DEFAULT 'NRW', gwoe_score REAL, gwoe_begruendung TEXT, gwoe_matrix TEXT, -- JSON array gwoe_schwerpunkt TEXT, -- JSON array wahlprogramm_scores TEXT, -- JSON array verbesserungen TEXT, -- JSON array staerken TEXT, -- JSON array schwaechen TEXT, -- JSON array empfehlung TEXT, empfehlung_symbol TEXT, verbesserungspotenzial TEXT, themen TEXT, -- JSON array antrag_zusammenfassung TEXT, antrag_kernpunkte TEXT, -- JSON array source TEXT DEFAULT 'batch', -- 'batch' or 'webapp' model TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # Migrations cursor = await db.execute("PRAGMA table_info(jobs)") job_cols = {r[1] for r in await cursor.fetchall()} if "drucksache" not in job_cols: await db.execute("ALTER TABLE jobs ADD COLUMN drucksache TEXT") cursor = await db.execute("PRAGMA table_info(assessments)") ass_cols = {r[1] for r in await cursor.fetchall()} if "konfidenz" not in ass_cols: await db.execute("ALTER TABLE assessments ADD COLUMN konfidenz TEXT") # #123 Embedding-Migration: Assessment-Zusammenfassungen bekommen # eigene Embeddings für Clustering (#105) und Ähnlichkeitssuche (#108). if "summary_embedding" not in ass_cols: await db.execute("ALTER TABLE assessments ADD COLUMN summary_embedding BLOB") if "embedding_model" not in ass_cols: await db.execute("ALTER TABLE assessments ADD COLUMN embedding_model TEXT") # #127: Drucksache-Typ (Original vom Landtag + normiert) if "typ" not in ass_cols: await db.execute("ALTER TABLE assessments ADD COLUMN typ TEXT") if "typ_normiert" not in ass_cols: await db.execute("ALTER TABLE assessments ADD COLUMN typ_normiert TEXT") # #133: Social-Media-Texte pro Antrag (vom LLM generiert) if "share_threads" not in ass_cols: await db.execute("ALTER TABLE assessments ADD COLUMN share_threads TEXT") if "share_twitter" not in ass_cols: await db.execute("ALTER TABLE assessments ADD COLUMN share_twitter TEXT") if "share_mastodon" not in ass_cols: await db.execute("ALTER TABLE assessments ADD COLUMN share_mastodon TEXT") # #128: Fraktionen ohne hinterlegtes Wahlprogramm (JSON-Array) if "fehlende_programme" not in ass_cols: await db.execute("ALTER TABLE assessments ADD COLUMN fehlende_programme TEXT") # Bookmarks (#94) await db.execute(""" CREATE TABLE IF NOT EXISTS bookmarks ( user_id TEXT NOT NULL, drucksache TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')), PRIMARY KEY (user_id, drucksache) ) """) # Merkliste — serverseitig persistent (#140) await db.execute(""" CREATE TABLE IF NOT EXISTS merkliste ( user_id TEXT NOT NULL, antrag_id TEXT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, notiz TEXT, PRIMARY KEY (user_id, antrag_id) ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_merkliste_user ON merkliste(user_id)" ) # Kommentare (#94) await db.execute(""" CREATE TABLE IF NOT EXISTS comments ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, user_name TEXT DEFAULT '', drucksache TEXT NOT NULL, text TEXT NOT NULL, visibility TEXT NOT NULL DEFAULT 'all', created_at TEXT NOT NULL DEFAULT (datetime('now')) ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_comments_drucksache ON comments(drucksache)" ) # Votes / Crowd-Validation (#112) await db.execute(""" CREATE TABLE IF NOT EXISTS votes ( user_id TEXT NOT NULL, drucksache TEXT NOT NULL, target TEXT NOT NULL DEFAULT 'overall', vote TEXT NOT NULL CHECK(vote IN ('up', 'down')), created_at TEXT NOT NULL DEFAULT (datetime('now')), PRIMARY KEY (user_id, drucksache, target) ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_votes_drucksache ON votes(drucksache)" ) # Assessment-Versionshistorie (#110) await db.execute(""" CREATE TABLE IF NOT EXISTS assessment_versions ( id INTEGER PRIMARY KEY AUTOINCREMENT, drucksache TEXT NOT NULL, version INTEGER NOT NULL, gwoe_score REAL, model TEXT, snapshot TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_versions_drucksache ON assessment_versions(drucksache)" ) # E-Mail-Abonnements (#124) # bundesland/partei NULL = beides "alle". frequency = 'daily' (weitere # später). last_sent als ISO-Timestamp, initial leer = sofort beim # ersten Digest-Lauf eligible. await db.execute(""" CREATE TABLE IF NOT EXISTS email_subscriptions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, email TEXT NOT NULL, bundesland TEXT, partei TEXT, frequency TEXT NOT NULL DEFAULT 'daily', last_sent TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_subs_user ON email_subscriptions(user_id)" ) # Monitoring-Tabellen (#135) await db.execute(""" CREATE TABLE IF NOT EXISTS monitoring_scans ( id INTEGER PRIMARY KEY AUTOINCREMENT, bundesland TEXT NOT NULL, drucksache TEXT NOT NULL, title TEXT, datum TEXT, typ TEXT, typ_normiert TEXT, fraktionen TEXT, -- JSON array link TEXT, is_assessed INTEGER NOT NULL DEFAULT 0, -- JOIN-Flag, aktuell nicht im Report seen_first_at TEXT NOT NULL, last_seen_at TEXT NOT NULL, UNIQUE(bundesland, drucksache) ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_monitoring_scans_bl ON monitoring_scans(bundesland)" ) await db.execute( "CREATE INDEX IF NOT EXISTS idx_monitoring_scans_first ON monitoring_scans(seen_first_at)" ) await db.execute(""" CREATE TABLE IF NOT EXISTS monitoring_daily_summary ( id INTEGER PRIMARY KEY AUTOINCREMENT, scan_date TEXT NOT NULL, -- ISO-Datum YYYY-MM-DD bundesland TEXT NOT NULL, total_seen INTEGER NOT NULL DEFAULT 0, new_count INTEGER NOT NULL DEFAULT 0, errors TEXT, -- Adapter-Fehlermeldungen (NULL = kein Fehler) created_at TEXT NOT NULL DEFAULT (datetime('now')), UNIQUE(scan_date, bundesland) ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_monitoring_summary_date ON monitoring_daily_summary(scan_date)" ) # abgeordnetenwatch-Abstimmungsdaten (#106 Phase 1) await db.execute(""" CREATE TABLE IF NOT EXISTS abgeordnetenwatch_polls ( poll_id INTEGER PRIMARY KEY, parliament_id INTEGER NOT NULL, bundesland TEXT NOT NULL, drucksache TEXT, titel TEXT, datum DATE, accepted BOOLEAN, topics TEXT, legislature_label TEXT, synced_at TIMESTAMP ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_aw_polls_ds " "ON abgeordnetenwatch_polls(drucksache)" ) await db.execute(""" CREATE TABLE IF NOT EXISTS abgeordnetenwatch_votes ( poll_id INTEGER NOT NULL, politician_id INTEGER NOT NULL, politician_name TEXT, partei TEXT, vote TEXT, PRIMARY KEY (poll_id, politician_id), FOREIGN KEY (poll_id) REFERENCES abgeordnetenwatch_polls(poll_id) ) """) # Fraktions-aggregierte Abstimmungsergebnisse aus Plenarprotokollen (#106). # Granularitaet: "GRUENE und SPD haben zugestimmt", nicht pro MP — das # ist der Datentyp, der aus deterministischen Parsern wie # protokoll_parser_nrw.py rauskommt. # Compound-PK ueber quelle_protokoll, weil eine Drucksache mehrfach # abgestimmt werden kann (Ausschuss-Empfehlung + Plenum-Beschluss). await db.execute(""" CREATE TABLE IF NOT EXISTS plenum_vote_results ( bundesland TEXT NOT NULL, drucksache TEXT NOT NULL, ergebnis TEXT NOT NULL, einstimmig INTEGER NOT NULL DEFAULT 0, fraktionen_ja TEXT NOT NULL DEFAULT '[]', fraktionen_nein TEXT NOT NULL DEFAULT '[]', fraktionen_enthaltung TEXT NOT NULL DEFAULT '[]', quelle_protokoll TEXT NOT NULL, quelle_url TEXT, parsed_at TEXT NOT NULL DEFAULT (datetime('now')), PRIMARY KEY (bundesland, drucksache, quelle_protokoll) ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_pvr_bl_ds " "ON plenum_vote_results(bundesland, drucksache)" ) await db.commit() # ─── Bookmark-Functions (#94) ─────────────────────────────────────────────── async def toggle_bookmark(user_id: str, drucksache: str) -> bool: """Toggle bookmark. Returns True if bookmarked, False if removed.""" async with aiosqlite.connect(settings.db_path) as db: row = await db.execute( "SELECT 1 FROM bookmarks WHERE user_id=? AND drucksache=?", (user_id, drucksache), ) exists = await row.fetchone() if exists: await db.execute( "DELETE FROM bookmarks WHERE user_id=? AND drucksache=?", (user_id, drucksache), ) await db.commit() return False else: await db.execute( "INSERT INTO bookmarks (user_id, drucksache) VALUES (?, ?)", (user_id, drucksache), ) await db.commit() return True async def get_bookmarks(user_id: str) -> list[str]: """Get all bookmarked drucksache IDs for a user.""" async with aiosqlite.connect(settings.db_path) as db: rows = await db.execute( "SELECT drucksache FROM bookmarks WHERE user_id=? ORDER BY created_at DESC", (user_id,), ) return [r[0] for r in await rows.fetchall()] # ─── Merkliste-Functions (#140) ───────────────────────────────────────────── async def merkliste_add(user_id: str, antrag_id: str, notiz: Optional[str] = None) -> dict: """Eintrag zur Merkliste hinzufügen (Upsert). Gibt den Eintrag zurück.""" async with aiosqlite.connect(settings.db_path) as db: await db.execute( """INSERT INTO merkliste (user_id, antrag_id, notiz) VALUES (?, ?, ?) ON CONFLICT(user_id, antrag_id) DO UPDATE SET notiz = COALESCE(excluded.notiz, merkliste.notiz)""", (user_id, antrag_id, notiz), ) await db.commit() db.row_factory = aiosqlite.Row row = await db.execute( "SELECT antrag_id, notiz, created_at FROM merkliste WHERE user_id=? AND antrag_id=?", (user_id, antrag_id), ) r = await row.fetchone() return dict(r) if r else {"antrag_id": antrag_id, "notiz": notiz} async def merkliste_remove(user_id: str, antrag_id: str) -> bool: """Eintrag aus der Merkliste entfernen. Gibt True zurück wenn gelöscht.""" async with aiosqlite.connect(settings.db_path) as db: cur = await db.execute( "DELETE FROM merkliste WHERE user_id=? AND antrag_id=?", (user_id, antrag_id), ) await db.commit() return cur.rowcount > 0 async def merkliste_list(user_id: str) -> list[dict]: """Alle Merklisten-Einträge eines Users, sortiert nach created_at DESC.""" async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row rows = await db.execute( "SELECT antrag_id, notiz, created_at FROM merkliste WHERE user_id=? ORDER BY created_at DESC", (user_id,), ) return [dict(r) for r in await rows.fetchall()] async def merkliste_bulk_add(user_id: str, entries: list[dict]) -> int: """Mehrere Einträge auf einmal hinzufügen (Upsert). Für localStorage-Migration. Jeder Eintrag muss ``antrag_id`` enthalten; ``notiz`` ist optional. Gibt die Anzahl verarbeiteter Einträge zurück. """ async with aiosqlite.connect(settings.db_path) as db: count = 0 for entry in entries: antrag_id = entry.get("antrag_id") if not antrag_id: continue notiz = entry.get("notiz") await db.execute( """INSERT INTO merkliste (user_id, antrag_id, notiz) VALUES (?, ?, ?) ON CONFLICT(user_id, antrag_id) DO NOTHING""", (user_id, antrag_id, notiz), ) count += 1 await db.commit() return count # ─── Email-Subscription-Functions (#124) ──────────────────────────────────── async def create_subscription( user_id: str, email: str, bundesland: Optional[str] = None, partei: Optional[str] = None, frequency: str = "daily", ) -> int: """Neues Abo anlegen. Gibt die neue Abo-ID zurück.""" async with aiosqlite.connect(settings.db_path) as db: cur = await db.execute( "INSERT INTO email_subscriptions (user_id, email, bundesland, partei, frequency) " "VALUES (?, ?, ?, ?, ?)", (user_id, email, bundesland, partei, frequency), ) await db.commit() return cur.lastrowid async def list_subscriptions(user_id: str) -> list[dict]: """Alle Abos eines Users.""" async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row rows = await db.execute( "SELECT id, email, bundesland, partei, frequency, last_sent, created_at " "FROM email_subscriptions WHERE user_id=? ORDER BY created_at DESC", (user_id,), ) return [dict(r) for r in await rows.fetchall()] async def list_all_subscriptions() -> list[dict]: """Alle Abos aller User — nur für Admin-Endpoints.""" async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row rows = await db.execute( "SELECT id, user_id, email, bundesland, partei, frequency, last_sent, created_at " "FROM email_subscriptions ORDER BY created_at DESC" ) return [dict(r) for r in await rows.fetchall()] async def delete_subscription(user_id: str, sub_id: int) -> bool: """Abo löschen. Prüft, dass es dem User gehört.""" async with aiosqlite.connect(settings.db_path) as db: cur = await db.execute( "DELETE FROM email_subscriptions WHERE id=? AND user_id=?", (sub_id, user_id), ) await db.commit() return cur.rowcount > 0 async def delete_subscription_by_id(sub_id: int) -> bool: """Abo per ID löschen (für Unsubscribe-Token-Link, kein User-Check).""" async with aiosqlite.connect(settings.db_path) as db: cur = await db.execute( "DELETE FROM email_subscriptions WHERE id=?", (sub_id,) ) await db.commit() return cur.rowcount > 0 async def get_all_subscriptions_due(frequency: str = "daily") -> list[dict]: """Alle Abos der gegebenen Frequency die einen Digest brauchen (last_sent IS NULL oder älter als 24h für daily).""" async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row rows = await db.execute( "SELECT * FROM email_subscriptions " "WHERE frequency = ? " " AND (last_sent IS NULL OR datetime(last_sent) < datetime('now', '-23 hours'))", (frequency,), ) return [dict(r) for r in await rows.fetchall()] async def mark_subscription_sent(sub_id: int) -> None: async with aiosqlite.connect(settings.db_path) as db: await db.execute( "UPDATE email_subscriptions SET last_sent = datetime('now') WHERE id = ?", (sub_id,), ) await db.commit() # ─── Comment-Functions (#94) ──────────────────────────────────────────────── async def add_comment(user_id: str, user_name: str, drucksache: str, text: str, visibility: str = "all") -> dict: """Add a comment. Returns the new comment dict.""" now = datetime.utcnow().isoformat() async with aiosqlite.connect(settings.db_path) as db: cursor = await db.execute( "INSERT INTO comments (user_id, user_name, drucksache, text, visibility, created_at) " "VALUES (?, ?, ?, ?, ?, ?)", (user_id, user_name, drucksache, text, visibility, now), ) await db.commit() return { "id": cursor.lastrowid, "user_id": user_id, "user_name": user_name, "drucksache": drucksache, "text": text, "visibility": visibility, "created_at": now, } async def get_comments(drucksache: str, user_id: Optional[str] = None) -> list[dict]: """Get comments for a drucksache, server-seitig nach Sichtbarkeit gefiltert. - 'all': immer sichtbar - 'authenticated': nur wenn user_id gesetzt (eingeloggt) - 'private': nur für den Autor - 'group:XYZ': für Gruppenmitglieder (TODO: Keycloak-Gruppen-Check) """ async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row rows = await db.execute( "SELECT * FROM comments WHERE drucksache=? ORDER BY created_at", (drucksache,), ) all_comments = [dict(r) for r in await rows.fetchall()] # Server-seitig filtern (Defense in Depth — Client filtert auch) result = [] for c in all_comments: vis = c.get("visibility", "all") if vis == "all": result.append(c) elif vis == "authenticated" and user_id: result.append(c) elif vis == "private" and user_id and c["user_id"] == user_id: result.append(c) elif vis.startswith("group:") and user_id: # TODO: Keycloak-Gruppen-Membership prüfen result.append(c) return result async def delete_comment(comment_id: int, user_id: str) -> bool: """Delete a comment (only by its author).""" async with aiosqlite.connect(settings.db_path) as db: cursor = await db.execute( "DELETE FROM comments WHERE id=? AND user_id=?", (comment_id, user_id), ) await db.commit() return cursor.rowcount > 0 # ─── Assessment-History (#110) ────────────────────────────────────────── async def get_assessment_history(drucksache: str) -> list[dict]: """Get version history for an assessment.""" import json as _json async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row rows = await db.execute( "SELECT version, gwoe_score, model, created_at FROM assessment_versions " "WHERE drucksache=? ORDER BY version DESC", (drucksache,), ) return [dict(r) for r in await rows.fetchall()] # ─── Vote-Functions (#112 Crowd-Validation) ─────────────────────────────── async def upsert_vote(user_id: str, drucksache: str, target: str, vote: str) -> dict: """Set or toggle a vote. Returns current vote state.""" async with aiosqlite.connect(settings.db_path) as db: existing = await db.execute( "SELECT vote FROM votes WHERE user_id=? AND drucksache=? AND target=?", (user_id, drucksache, target), ) row = await existing.fetchone() if row and row[0] == vote: # Same vote again → remove (toggle off) await db.execute( "DELETE FROM votes WHERE user_id=? AND drucksache=? AND target=?", (user_id, drucksache, target), ) await db.commit() return {"vote": None} else: await db.execute( "INSERT OR REPLACE INTO votes (user_id, drucksache, target, vote) VALUES (?, ?, ?, ?)", (user_id, drucksache, target, vote), ) await db.commit() return {"vote": vote} async def get_votes(drucksache: str, user_id: str = None) -> dict: """Get aggregated votes for a drucksache + optional user's own votes.""" async with aiosqlite.connect(settings.db_path) as db: # Aggregated counts per target rows = await db.execute( "SELECT target, vote, COUNT(*) as cnt FROM votes " "WHERE drucksache=? GROUP BY target, vote", (drucksache,), ) counts = {} for r in await rows.fetchall(): target, vote, cnt = r if target not in counts: counts[target] = {"up": 0, "down": 0} counts[target][vote] = cnt # User's own votes my_votes = {} if user_id: rows = await db.execute( "SELECT target, vote FROM votes WHERE drucksache=? AND user_id=?", (drucksache, user_id), ) for r in await rows.fetchall(): my_votes[r[0]] = r[1] return {"counts": counts, "my_votes": my_votes} async def create_job( job_id: str, input_preview: str, bundesland: str = "NRW", model: str = "qwen-plus", user_id: Optional[str] = None, drucksache: Optional[str] = None, ) -> dict: """Create a new analysis job.""" now = datetime.utcnow().isoformat() async with aiosqlite.connect(settings.db_path) as db: await db.execute( """ INSERT INTO jobs (id, input_preview, bundesland, model, user_id, drucksache, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (job_id, input_preview, bundesland, model, user_id, drucksache, now, now), ) await db.commit() return {"id": job_id, "status": "queued", "created_at": now} async def get_job(job_id: str) -> Optional[dict]: """Get job by ID.""" async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)) row = await cursor.fetchone() if row: return dict(row) return None async def update_job(job_id: str, **kwargs) -> bool: """Update job fields.""" if not kwargs: return False kwargs["updated_at"] = datetime.utcnow().isoformat() fields = ", ".join(f"{k} = ?" for k in kwargs.keys()) values = list(kwargs.values()) + [job_id] async with aiosqlite.connect(settings.db_path) as db: await db.execute(f"UPDATE jobs SET {fields} WHERE id = ?", values) await db.commit() return True async def get_user_jobs(user_id: str, limit: int = 50) -> list[dict]: """Get jobs for a user (for history page).""" async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC LIMIT ?", (user_id, limit), ) rows = await cursor.fetchall() return [dict(row) for row in rows] async def upsert_assessment(data: dict) -> bool: """Insert or update an assessment. Archives old version if exists (#110).""" import json now = datetime.utcnow().isoformat() async with aiosqlite.connect(settings.db_path) as db: # Alte Version archivieren, falls vorhanden drucksache = data.get("drucksache") db.row_factory = aiosqlite.Row old = await db.execute("SELECT * FROM assessments WHERE drucksache=?", (drucksache,)) old_row = await old.fetchone() if old_row: old_dict = dict(old_row) # Version = bisherige Anzahl + 1 ver_count = await db.execute( "SELECT COUNT(*) FROM assessment_versions WHERE drucksache=?", (drucksache,) ) version = (await ver_count.fetchone())[0] + 1 await db.execute( "INSERT INTO assessment_versions (drucksache, version, gwoe_score, model, snapshot) " "VALUES (?, ?, ?, ?, ?)", (drucksache, version, old_dict.get("gwoe_score"), old_dict.get("model"), json.dumps(old_dict, ensure_ascii=False, default=str)), ) db.row_factory = None # #123: Assessment-Embedding (optional, kann None sein wenn Embedding- # API gerade down war — wird vom Backfill-Script später nachgezogen) summary_embedding = data.get("summary_embedding") # bytes | None embedding_model = data.get("embedding_model") # str | None await db.execute(""" INSERT INTO assessments ( drucksache, title, fraktionen, datum, link, bundesland, gwoe_score, gwoe_begruendung, gwoe_matrix, gwoe_schwerpunkt, wahlprogramm_scores, verbesserungen, staerken, schwaechen, empfehlung, empfehlung_symbol, verbesserungspotenzial, themen, antrag_zusammenfassung, antrag_kernpunkte, source, model, konfidenz, summary_embedding, embedding_model, share_threads, share_twitter, share_mastodon, fehlende_programme, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(drucksache) DO UPDATE SET title = excluded.title, gwoe_score = excluded.gwoe_score, gwoe_begruendung = excluded.gwoe_begruendung, gwoe_matrix = excluded.gwoe_matrix, konfidenz = excluded.konfidenz, summary_embedding = COALESCE(excluded.summary_embedding, assessments.summary_embedding), embedding_model = COALESCE(excluded.embedding_model, assessments.embedding_model), share_threads = COALESCE(excluded.share_threads, assessments.share_threads), share_twitter = COALESCE(excluded.share_twitter, assessments.share_twitter), share_mastodon = COALESCE(excluded.share_mastodon, assessments.share_mastodon), fehlende_programme = excluded.fehlende_programme, updated_at = excluded.updated_at """, ( data.get("drucksache"), data.get("title"), json.dumps(data.get("fraktionen", [])), data.get("datum"), data.get("link"), data.get("bundesland", "NRW"), data.get("gwoeScore"), data.get("gwoeBegründung"), json.dumps(data.get("gwoeMatrix", [])), json.dumps(data.get("gwoeSchwerpunkt", [])), json.dumps(data.get("wahlprogrammScores", [])), json.dumps(data.get("verbesserungen", [])), json.dumps(data.get("stärken", [])), json.dumps(data.get("schwächen", [])), data.get("empfehlung"), data.get("empfehlungSymbol"), data.get("verbesserungspotenzial"), json.dumps(data.get("themen", [])), data.get("antragZusammenfassung"), json.dumps(data.get("antragKernpunkte", [])), data.get("source", "webapp"), data.get("model"), data.get("konfidenz"), summary_embedding, embedding_model, data.get("share_threads"), data.get("share_twitter"), data.get("share_mastodon"), json.dumps(data.get("fehlendeProgramme", [])), now, now )) await db.commit() return True async def get_assessment(drucksache: str) -> Optional[dict]: """Get assessment by drucksache ID.""" import json async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM assessments WHERE drucksache = ?", (drucksache,) ) row = await cursor.fetchone() if row: d = dict(row) # Parse JSON fields for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt", "wahlprogramm_scores", "verbesserungen", "staerken", "schwaechen", "themen", "antrag_kernpunkte", "fehlende_programme"]: if d.get(field): try: d[field] = json.loads(d[field]) except: pass return d return None async def delete_assessment(drucksache: str) -> bool: """Delete an assessment by drucksache ID. Used by the cite-endpoint to trigger re-analysis of Pre-#60 hallucinated assessments.""" async with aiosqlite.connect(settings.db_path) as db: cursor = await db.execute( "DELETE FROM assessments WHERE drucksache = ?", (drucksache,) ) await db.commit() return cursor.rowcount > 0 async def get_all_assessments(bundesland: str = None) -> list[dict]: """Get all assessments from database, optionally filtered by Bundesland. The special value ``"ALL"`` and ``None`` mean no filter — both behave identically and return every row. Any other value becomes a strict ``WHERE bundesland = ?`` match. """ import json sql = "SELECT * FROM assessments" params: list = [] if bundesland and bundesland != "ALL": sql += " WHERE bundesland = ?" params.append(bundesland) sql += " ORDER BY gwoe_score DESC" async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(sql, params) rows = await cursor.fetchall() results = [] for row in rows: d = dict(row) # Parse JSON fields for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt", "wahlprogramm_scores", "verbesserungen", "staerken", "schwaechen", "themen", "antrag_kernpunkte", "fehlende_programme"]: if d.get(field): try: d[field] = json.loads(d[field]) except: pass results.append(d) return results async def import_json_assessments(assessments_dir): """Import assessments from JSON files into database.""" import json from pathlib import Path dir_path = Path(assessments_dir) if not dir_path.exists(): return 0 count = 0 for f in dir_path.glob("*.json"): try: data = json.loads(f.read_text()) data["source"] = "batch" await upsert_assessment(data) count += 1 except Exception as e: print(f"Error importing {f}: {e}") return count def _parse_search_query(query: str) -> tuple[list[str], bool]: """ Parse search query for AND logic and exact phrases. Returns: (terms, is_exact) Examples: - 'Klimaschutz Energie' -> (['klimaschutz', 'energie'], False) - '"Grüner Stahl"' -> (['grüner stahl'], True) """ query = query.strip() # Check for exact phrase (entire query in quotes) if query.startswith('"') and query.endswith('"') and query.count('"') == 2: exact = query[1:-1].strip() return ([exact.lower()], True) # Extract quoted phrases and regular terms import logging import shlex try: parts = shlex.split(query) except ValueError as e: # Unbalanced quote — fall back to whitespace split, but log so we # notice patterns of malformed queries (Issue #57 Befund #7). logging.getLogger(__name__).warning( "shlex.split failed on search query (%s), falling back to whitespace split", e ) parts = query.split() return ([p.lower() for p in parts], False) async def search_assessments(query: str, bundesland: str = None, limit: int = 50) -> list[dict]: """Search assessments by title, drucksache, or themen. Supports AND logic.""" import json terms, is_exact = _parse_search_query(query) # Build SQL for first term (to narrow down results) first_term = terms[0] if terms else query.lower() sql = """ SELECT * FROM assessments WHERE ( LOWER(drucksache) LIKE ? OR LOWER(title) LIKE ? OR LOWER(themen) LIKE ? OR LOWER(fraktionen) LIKE ? ) """ params = [f"%{first_term}%"] * 4 if bundesland and bundesland != "ALL": sql += " AND bundesland = ?" params.append(bundesland) sql += " ORDER BY gwoe_score DESC LIMIT ?" params.append(limit) async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(sql, params) rows = await cursor.fetchall() results = [] for row in rows: d = dict(row) for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt", "wahlprogramm_scores", "verbesserungen", "staerken", "schwaechen", "themen", "antrag_kernpunkte", "fehlende_programme"]: if d.get(field): try: d[field] = json.loads(d[field]) except: pass # Apply AND filter for multiple terms if len(terms) > 1 or is_exact: searchable = f"{d.get('title', '')} {d.get('drucksache', '')} {' '.join(d.get('fraktionen', []))} {' '.join(d.get('themen', []))}".lower() if is_exact: if terms[0] not in searchable: continue else: if not all(term in searchable for term in terms): continue results.append(d) return results # ─── Monitoring-Functions (#135) ──────────────────────────────────────────── async def upsert_monitoring_scan( bundesland: str, drucksache: str, title: Optional[str], datum: Optional[str], typ: Optional[str], typ_normiert: Optional[str], fraktionen: list, link: Optional[str], now: str, ) -> bool: """UPSERT für einen Monitoring-Treffer. seen_first_at bleibt beim ersten INSERT stabil. last_seen_at wird bei jedem Lauf aktualisiert. Gibt True zurück wenn der Eintrag neu angelegt wurde (new), False bei Update (already known). """ import json as _json async with aiosqlite.connect(settings.db_path) as db: cur = await db.execute( "SELECT id FROM monitoring_scans WHERE bundesland=? AND drucksache=?", (bundesland, drucksache), ) existing = await cur.fetchone() if existing: await db.execute( "UPDATE monitoring_scans SET last_seen_at=? WHERE bundesland=? AND drucksache=?", (now, bundesland, drucksache), ) await db.commit() return False # bereits bekannt else: await db.execute( """INSERT INTO monitoring_scans (bundesland, drucksache, title, datum, typ, typ_normiert, fraktionen, link, seen_first_at, last_seen_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( bundesland, drucksache, title, datum, typ, typ_normiert, _json.dumps(fraktionen or [], ensure_ascii=False), link, now, now, ), ) await db.commit() return True # neu async def upsert_monitoring_summary( scan_date: str, bundesland: str, total_seen: int, new_count: int, errors: Optional[str], ) -> None: """UPSERT tägliche Zusammenfassung pro Bundesland.""" async with aiosqlite.connect(settings.db_path) as db: await db.execute( """INSERT INTO monitoring_daily_summary (scan_date, bundesland, total_seen, new_count, errors) VALUES (?, ?, ?, ?, ?) ON CONFLICT(scan_date, bundesland) DO UPDATE SET total_seen = excluded.total_seen, new_count = excluded.new_count, errors = excluded.errors""", (scan_date, bundesland, total_seen, new_count, errors), ) await db.commit() async def get_monitoring_summary(scan_date: str) -> list[dict]: """Alle Zusammenfassungen für ein bestimmtes Datum.""" async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row rows = await db.execute( "SELECT * FROM monitoring_daily_summary WHERE scan_date=? ORDER BY bundesland", (scan_date,), ) return [dict(r) for r in await rows.fetchall()] # ─── abgeordnetenwatch-Functions (#106) ───────────────────────────────────── async def upsert_aw_poll( poll_id: int, parliament_id: int, bundesland: str, drucksache: Optional[str], titel: Optional[str], datum: Optional[str], accepted: Optional[bool], topics: list, legislature_label: Optional[str], synced_at: str, ) -> bool: """UPSERT für einen abgeordnetenwatch-Poll. Gibt True zurück wenn der Eintrag neu angelegt wurde, False bei Update. """ import json as _json async with aiosqlite.connect(settings.db_path) as db: cur = await db.execute( "SELECT poll_id FROM abgeordnetenwatch_polls WHERE poll_id=?", (poll_id,), ) existing = await cur.fetchone() if existing: await db.execute( """UPDATE abgeordnetenwatch_polls SET drucksache=?, titel=?, datum=?, accepted=?, topics=?, legislature_label=?, synced_at=? WHERE poll_id=?""", ( drucksache, titel, datum, 1 if accepted else 0, _json.dumps(topics or [], ensure_ascii=False), legislature_label, synced_at, poll_id, ), ) await db.commit() return False else: await db.execute( """INSERT INTO abgeordnetenwatch_polls (poll_id, parliament_id, bundesland, drucksache, titel, datum, accepted, topics, legislature_label, synced_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( poll_id, parliament_id, bundesland, drucksache, titel, datum, 1 if accepted else 0, _json.dumps(topics or [], ensure_ascii=False), legislature_label, synced_at, ), ) await db.commit() return True async def upsert_aw_vote( poll_id: int, politician_id: int, politician_name: Optional[str], partei: Optional[str], vote: str, ) -> bool: """UPSERT für eine Einzelstimme. Gibt True zurück wenn der Eintrag neu angelegt wurde, False bei Update. """ async with aiosqlite.connect(settings.db_path) as db: cur = await db.execute( "SELECT poll_id FROM abgeordnetenwatch_votes WHERE poll_id=? AND politician_id=?", (poll_id, politician_id), ) existing = await cur.fetchone() if existing: await db.execute( """UPDATE abgeordnetenwatch_votes SET politician_name=?, partei=?, vote=? WHERE poll_id=? AND politician_id=?""", (politician_name, partei, vote, poll_id, politician_id), ) await db.commit() return False else: await db.execute( """INSERT INTO abgeordnetenwatch_votes (poll_id, politician_id, politician_name, partei, vote) VALUES (?, ?, ?, ?, ?)""", (poll_id, politician_id, politician_name, partei, vote), ) await db.commit() return True async def get_abstimmungsverhalten(drucksache: str) -> Optional[dict]: """Gibt Fraktions-Aggregate (Yes/No/Abstain pro Partei) zurück. Sucht in abgeordnetenwatch_polls nach der Drucksache und aggregiert die Stimmen aus abgeordnetenwatch_votes nach Partei. Returns: Dict mit ``poll_id``, ``titel``, ``datum``, ``accepted`` und ``fraktionen`` (Liste von {partei, yes, no, abstain, no_show}). None wenn keine Abstimmungsdaten vorliegen. """ async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row cur = await db.execute( "SELECT * FROM abgeordnetenwatch_polls WHERE drucksache=? LIMIT 1", (drucksache,), ) poll_row = await cur.fetchone() if not poll_row: return None poll = dict(poll_row) rows = await db.execute( """SELECT partei, SUM(CASE WHEN vote='yes' THEN 1 ELSE 0 END) AS yes, SUM(CASE WHEN vote='no' THEN 1 ELSE 0 END) AS no, SUM(CASE WHEN vote='abstain' THEN 1 ELSE 0 END) AS abstain, SUM(CASE WHEN vote='no_show' THEN 1 ELSE 0 END) AS no_show FROM abgeordnetenwatch_votes WHERE poll_id=? GROUP BY partei ORDER BY (yes + no + abstain + no_show) DESC""", (poll["poll_id"],), ) fraktionen = [ { "partei": r["partei"] or "Unbekannt", "yes": r["yes"], "no": r["no"], "abstain": r["abstain"], "no_show": r["no_show"], } for r in await rows.fetchall() ] return { "poll_id": poll["poll_id"], "titel": poll["titel"], "datum": poll["datum"], "accepted": bool(poll["accepted"]), "fraktionen": fraktionen, } async def get_monitoring_new_today(scan_date: str) -> list[dict]: """Alle Einträge aus monitoring_scans, die am scan_date erstmals gesehen wurden.""" import json as _json async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row rows = await db.execute( "SELECT * FROM monitoring_scans WHERE seen_first_at LIKE ? ORDER BY bundesland, drucksache", (f"{scan_date}%",), ) result = [] for r in await rows.fetchall(): d = dict(r) if d.get("fraktionen"): try: d["fraktionen"] = _json.loads(d["fraktionen"]) except Exception: pass result.append(d) return result # ─── Plenum-Vote-Results (#106) ───────────────────────────────────────────── # Fraktions-aggregierte Abstimmungsergebnisse aus Plenarprotokollen. # Quelle: protokoll_parser_nrw.py (NRW). BL-uebergreifender Parser ist #126. async def upsert_plenum_vote( *, bundesland: str, drucksache: str, ergebnis: str, einstimmig: bool, fraktionen_ja: list[str], fraktionen_nein: list[str], fraktionen_enthaltung: list[str], quelle_protokoll: str, quelle_url: Optional[str] = None, ) -> None: """Schreibt ein Abstimmungsergebnis aus einem Plenarprotokoll. Idempotent ueber den Compound-PK (bundesland, drucksache, quelle_protokoll): derselbe Eintrag aus demselben Protokoll wird upgesertet, mehrfach-Voten derselben Drucksache aus verschiedenen Protokollen behalten beide Eintraege. """ import json as _json async with aiosqlite.connect(settings.db_path) as db: await db.execute( """ INSERT INTO plenum_vote_results (bundesland, drucksache, ergebnis, einstimmig, fraktionen_ja, fraktionen_nein, fraktionen_enthaltung, quelle_protokoll, quelle_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(bundesland, drucksache, quelle_protokoll) DO UPDATE SET ergebnis = excluded.ergebnis, einstimmig = excluded.einstimmig, fraktionen_ja = excluded.fraktionen_ja, fraktionen_nein = excluded.fraktionen_nein, fraktionen_enthaltung = excluded.fraktionen_enthaltung, quelle_url = excluded.quelle_url, parsed_at = datetime('now') """, ( bundesland, drucksache, ergebnis, 1 if einstimmig else 0, _json.dumps(fraktionen_ja, ensure_ascii=False), _json.dumps(fraktionen_nein, ensure_ascii=False), _json.dumps(fraktionen_enthaltung, ensure_ascii=False), quelle_protokoll, quelle_url, ), ) await db.commit() async def get_plenum_votes(bundesland: str, drucksache: str) -> list[dict]: """Alle Plenarprotokoll-Abstimmungen fuer eine Drucksache, neueste zuerst. Eine Drucksache kann mehrfach abgestimmt werden (z.B. Ueberweisung + finale Beschlussfassung), deshalb Liste statt Single. """ import json as _json async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row rows = await db.execute( """ SELECT * FROM plenum_vote_results WHERE bundesland = ? AND drucksache = ? ORDER BY parsed_at DESC """, (bundesland, drucksache), ) out = [] for r in await rows.fetchall(): d = dict(r) d["einstimmig"] = bool(d.get("einstimmig")) for key in ("fraktionen_ja", "fraktionen_nein", "fraktionen_enthaltung"): try: d[key] = _json.loads(d.get(key) or "[]") except Exception: d[key] = [] out.append(d) return out