"""SQLite database for job tracking.""" import aiosqlite from datetime import datetime from typing import Optional from .config import settings async def init_db(): """Initialize database with tables.""" async with aiosqlite.connect(settings.db_path) as db: await db.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, status TEXT NOT NULL DEFAULT 'queued', input_preview TEXT, bundesland TEXT DEFAULT 'NRW', model TEXT DEFAULT 'qwen-plus', result TEXT, html_path TEXT, pdf_path TEXT, error TEXT, user_id TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # Assessments table for pre-computed + new analyses await db.execute(""" CREATE TABLE IF NOT EXISTS assessments ( drucksache TEXT PRIMARY KEY, title TEXT, fraktionen TEXT, -- JSON array datum TEXT, link TEXT, bundesland TEXT DEFAULT 'NRW', gwoe_score REAL, gwoe_begruendung TEXT, gwoe_matrix TEXT, -- JSON array gwoe_schwerpunkt TEXT, -- JSON array wahlprogramm_scores TEXT, -- JSON array verbesserungen TEXT, -- JSON array staerken TEXT, -- JSON array schwaechen TEXT, -- JSON array empfehlung TEXT, empfehlung_symbol TEXT, verbesserungspotenzial TEXT, themen TEXT, -- JSON array antrag_zusammenfassung TEXT, antrag_kernpunkte TEXT, -- JSON array source TEXT DEFAULT 'batch', -- 'batch' or 'webapp' model TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # Bookmarks (#94) await db.execute(""" CREATE TABLE IF NOT EXISTS bookmarks ( user_id TEXT NOT NULL, drucksache TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')), PRIMARY KEY (user_id, drucksache) ) """) # Kommentare (#94) await db.execute(""" CREATE TABLE IF NOT EXISTS comments ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, user_name TEXT DEFAULT '', drucksache TEXT NOT NULL, text TEXT NOT NULL, visibility TEXT NOT NULL DEFAULT 'all', created_at TEXT NOT NULL DEFAULT (datetime('now')) ) """) await db.execute( "CREATE INDEX IF NOT EXISTS idx_comments_drucksache ON comments(drucksache)" ) await db.commit() # ─── Bookmark-Functions (#94) ─────────────────────────────────────────────── async def toggle_bookmark(user_id: str, drucksache: str) -> bool: """Toggle bookmark. Returns True if bookmarked, False if removed.""" async with aiosqlite.connect(settings.db_path) as db: row = await db.execute( "SELECT 1 FROM bookmarks WHERE user_id=? AND drucksache=?", (user_id, drucksache), ) exists = await row.fetchone() if exists: await db.execute( "DELETE FROM bookmarks WHERE user_id=? AND drucksache=?", (user_id, drucksache), ) await db.commit() return False else: await db.execute( "INSERT INTO bookmarks (user_id, drucksache) VALUES (?, ?)", (user_id, drucksache), ) await db.commit() return True async def get_bookmarks(user_id: str) -> list[str]: """Get all bookmarked drucksache IDs for a user.""" async with aiosqlite.connect(settings.db_path) as db: rows = await db.execute( "SELECT drucksache FROM bookmarks WHERE user_id=? ORDER BY created_at DESC", (user_id,), ) return [r[0] for r in await rows.fetchall()] # ─── Comment-Functions (#94) ──────────────────────────────────────────────── async def add_comment(user_id: str, user_name: str, drucksache: str, text: str, visibility: str = "all") -> dict: """Add a comment. Returns the new comment dict.""" now = datetime.utcnow().isoformat() async with aiosqlite.connect(settings.db_path) as db: cursor = await db.execute( "INSERT INTO comments (user_id, user_name, drucksache, text, visibility, created_at) " "VALUES (?, ?, ?, ?, ?, ?)", (user_id, user_name, drucksache, text, visibility, now), ) await db.commit() return { "id": cursor.lastrowid, "user_id": user_id, "user_name": user_name, "drucksache": drucksache, "text": text, "visibility": visibility, "created_at": now, } async def get_comments(drucksache: str, user_id: Optional[str] = None) -> list[dict]: """Get comments for a drucksache, server-seitig nach Sichtbarkeit gefiltert. - 'all': immer sichtbar - 'authenticated': nur wenn user_id gesetzt (eingeloggt) - 'private': nur für den Autor - 'group:XYZ': für Gruppenmitglieder (TODO: Keycloak-Gruppen-Check) """ async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row rows = await db.execute( "SELECT * FROM comments WHERE drucksache=? ORDER BY created_at", (drucksache,), ) all_comments = [dict(r) for r in await rows.fetchall()] # Server-seitig filtern (Defense in Depth — Client filtert auch) result = [] for c in all_comments: vis = c.get("visibility", "all") if vis == "all": result.append(c) elif vis == "authenticated" and user_id: result.append(c) elif vis == "private" and user_id and c["user_id"] == user_id: result.append(c) elif vis.startswith("group:") and user_id: # TODO: Keycloak-Gruppen-Membership prüfen result.append(c) return result async def delete_comment(comment_id: int, user_id: str) -> bool: """Delete a comment (only by its author).""" async with aiosqlite.connect(settings.db_path) as db: cursor = await db.execute( "DELETE FROM comments WHERE id=? AND user_id=?", (comment_id, user_id), ) await db.commit() return cursor.rowcount > 0 async def create_job( job_id: str, input_preview: str, bundesland: str = "NRW", model: str = "qwen-plus", user_id: Optional[str] = None, ) -> dict: """Create a new analysis job.""" now = datetime.utcnow().isoformat() async with aiosqlite.connect(settings.db_path) as db: await db.execute( """ INSERT INTO jobs (id, input_preview, bundesland, model, user_id, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, (job_id, input_preview, bundesland, model, user_id, now, now), ) await db.commit() return {"id": job_id, "status": "queued", "created_at": now} async def get_job(job_id: str) -> Optional[dict]: """Get job by ID.""" async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)) row = await cursor.fetchone() if row: return dict(row) return None async def update_job(job_id: str, **kwargs) -> bool: """Update job fields.""" if not kwargs: return False kwargs["updated_at"] = datetime.utcnow().isoformat() fields = ", ".join(f"{k} = ?" for k in kwargs.keys()) values = list(kwargs.values()) + [job_id] async with aiosqlite.connect(settings.db_path) as db: await db.execute(f"UPDATE jobs SET {fields} WHERE id = ?", values) await db.commit() return True async def get_user_jobs(user_id: str, limit: int = 50) -> list[dict]: """Get jobs for a user (for history page).""" async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC LIMIT ?", (user_id, limit), ) rows = await cursor.fetchall() return [dict(row) for row in rows] async def upsert_assessment(data: dict) -> bool: """Insert or update an assessment.""" import json now = datetime.utcnow().isoformat() async with aiosqlite.connect(settings.db_path) as db: await db.execute(""" INSERT INTO assessments ( drucksache, title, fraktionen, datum, link, bundesland, gwoe_score, gwoe_begruendung, gwoe_matrix, gwoe_schwerpunkt, wahlprogramm_scores, verbesserungen, staerken, schwaechen, empfehlung, empfehlung_symbol, verbesserungspotenzial, themen, antrag_zusammenfassung, antrag_kernpunkte, source, model, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(drucksache) DO UPDATE SET title = excluded.title, gwoe_score = excluded.gwoe_score, gwoe_begruendung = excluded.gwoe_begruendung, gwoe_matrix = excluded.gwoe_matrix, updated_at = excluded.updated_at """, ( data.get("drucksache"), data.get("title"), json.dumps(data.get("fraktionen", [])), data.get("datum"), data.get("link"), data.get("bundesland", "NRW"), data.get("gwoeScore"), data.get("gwoeBegründung"), json.dumps(data.get("gwoeMatrix", [])), json.dumps(data.get("gwoeSchwerpunkt", [])), json.dumps(data.get("wahlprogrammScores", [])), json.dumps(data.get("verbesserungen", [])), json.dumps(data.get("stärken", [])), json.dumps(data.get("schwächen", [])), data.get("empfehlung"), data.get("empfehlungSymbol"), data.get("verbesserungspotenzial"), json.dumps(data.get("themen", [])), data.get("antragZusammenfassung"), json.dumps(data.get("antragKernpunkte", [])), data.get("source", "webapp"), data.get("model"), now, now )) await db.commit() return True async def get_assessment(drucksache: str) -> Optional[dict]: """Get assessment by drucksache ID.""" import json async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM assessments WHERE drucksache = ?", (drucksache,) ) row = await cursor.fetchone() if row: d = dict(row) # Parse JSON fields for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt", "wahlprogramm_scores", "verbesserungen", "staerken", "schwaechen", "themen", "antrag_kernpunkte"]: if d.get(field): try: d[field] = json.loads(d[field]) except: pass return d return None async def delete_assessment(drucksache: str) -> bool: """Delete an assessment by drucksache ID. Used by the cite-endpoint to trigger re-analysis of Pre-#60 hallucinated assessments.""" async with aiosqlite.connect(settings.db_path) as db: cursor = await db.execute( "DELETE FROM assessments WHERE drucksache = ?", (drucksache,) ) await db.commit() return cursor.rowcount > 0 async def get_all_assessments(bundesland: str = None) -> list[dict]: """Get all assessments from database, optionally filtered by Bundesland. The special value ``"ALL"`` and ``None`` mean no filter — both behave identically and return every row. Any other value becomes a strict ``WHERE bundesland = ?`` match. """ import json sql = "SELECT * FROM assessments" params: list = [] if bundesland and bundesland != "ALL": sql += " WHERE bundesland = ?" params.append(bundesland) sql += " ORDER BY gwoe_score DESC" async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(sql, params) rows = await cursor.fetchall() results = [] for row in rows: d = dict(row) # Parse JSON fields for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt", "wahlprogramm_scores", "verbesserungen", "staerken", "schwaechen", "themen", "antrag_kernpunkte"]: if d.get(field): try: d[field] = json.loads(d[field]) except: pass results.append(d) return results async def import_json_assessments(assessments_dir): """Import assessments from JSON files into database.""" import json from pathlib import Path dir_path = Path(assessments_dir) if not dir_path.exists(): return 0 count = 0 for f in dir_path.glob("*.json"): try: data = json.loads(f.read_text()) data["source"] = "batch" await upsert_assessment(data) count += 1 except Exception as e: print(f"Error importing {f}: {e}") return count def _parse_search_query(query: str) -> tuple[list[str], bool]: """ Parse search query for AND logic and exact phrases. Returns: (terms, is_exact) Examples: - 'Klimaschutz Energie' -> (['klimaschutz', 'energie'], False) - '"Grüner Stahl"' -> (['grüner stahl'], True) """ query = query.strip() # Check for exact phrase (entire query in quotes) if query.startswith('"') and query.endswith('"') and query.count('"') == 2: exact = query[1:-1].strip() return ([exact.lower()], True) # Extract quoted phrases and regular terms import logging import shlex try: parts = shlex.split(query) except ValueError as e: # Unbalanced quote — fall back to whitespace split, but log so we # notice patterns of malformed queries (Issue #57 Befund #7). logging.getLogger(__name__).warning( "shlex.split failed on search query (%s), falling back to whitespace split", e ) parts = query.split() return ([p.lower() for p in parts], False) async def search_assessments(query: str, bundesland: str = None, limit: int = 50) -> list[dict]: """Search assessments by title, drucksache, or themen. Supports AND logic.""" import json terms, is_exact = _parse_search_query(query) # Build SQL for first term (to narrow down results) first_term = terms[0] if terms else query.lower() sql = """ SELECT * FROM assessments WHERE ( LOWER(drucksache) LIKE ? OR LOWER(title) LIKE ? OR LOWER(themen) LIKE ? OR LOWER(fraktionen) LIKE ? ) """ params = [f"%{first_term}%"] * 4 if bundesland and bundesland != "ALL": sql += " AND bundesland = ?" params.append(bundesland) sql += " ORDER BY gwoe_score DESC LIMIT ?" params.append(limit) async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row cursor = await db.execute(sql, params) rows = await cursor.fetchall() results = [] for row in rows: d = dict(row) for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt", "wahlprogramm_scores", "verbesserungen", "staerken", "schwaechen", "themen", "antrag_kernpunkte"]: if d.get(field): try: d[field] = json.loads(d[field]) except: pass # Apply AND filter for multiple terms if len(terms) > 1 or is_exact: searchable = f"{d.get('title', '')} {d.get('drucksache', '')} {' '.join(d.get('fraktionen', []))} {' '.join(d.get('themen', []))}".lower() if is_exact: if terms[0] not in searchable: continue else: if not all(term in searchable for term in terms): continue results.append(d) return results