gwoe-antragspruefer/app/database.py
Dotty Dotter a821c19202 #47: Auto-Re-Analyse bei nicht-verifizierbaren Zitaten
Statt eine Nachricht "Textstelle nicht auffindbar" zu zeigen (was User
zurecht als Quatsch bezeichnet hat), erkennt der Cite-Endpoint jetzt
halluzinierte Zitate und triggert automatisch eine Re-Analyse:

Flow:
1. User klickt auf Zitat-Link
2. render_highlighted_page gibt (pdf, page, highlighted=False) zurück
3. Endpoint prüft: ds+bl Parameter vorhanden? Assessment in DB?
4. → Löscht altes Assessment, startet Re-Analyse als Background-Task
5. → Zeigt HTML-Warte-Seite mit Spinner und "Wird neu analysiert..."
6. → Auto-Redirect nach 15s zurück zum Assessment

Das neue Assessment hat durch reconstruct_zitate verifizierte Zitate,
die dann beim nächsten Klick korrekt gehighlighted werden.

Änderungen:
- embeddings.render_highlighted_page: Return-Typ (bytes, int, bool) —
  drittes Element ist True wenn Highlight gesetzt wurde
- database.delete_assessment: neue Funktion für die Re-Analyse
- main.py cite-Endpoint: akzeptiert ds= und bl= als optionale Params,
  triggert Re-Analyse bei highlighted=False + ds vorhanden
- Frontend: makeCiteUrl reicht ds+bl aus dem Assessment-Kontext mit
  durch in die Cite-URL
- Cache-Control auf 1h reduziert (war 24h, zu aggressiv für
  Assessments die sich durch Re-Analyse ändern)

Tests: 194/194 grün.

Refs: #47, #60
2026-04-10 10:35:01 +02:00

360 lines
12 KiB
Python

"""SQLite database for job tracking."""
import aiosqlite
from datetime import datetime
from typing import Optional
from .config import settings
async def init_db():
"""Initialize database with tables."""
async with aiosqlite.connect(settings.db_path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'queued',
input_preview TEXT,
bundesland TEXT DEFAULT 'NRW',
model TEXT DEFAULT 'qwen-plus',
result TEXT,
html_path TEXT,
pdf_path TEXT,
error TEXT,
user_id TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# Assessments table for pre-computed + new analyses
await db.execute("""
CREATE TABLE IF NOT EXISTS assessments (
drucksache TEXT PRIMARY KEY,
title TEXT,
fraktionen TEXT, -- JSON array
datum TEXT,
link TEXT,
bundesland TEXT DEFAULT 'NRW',
gwoe_score REAL,
gwoe_begruendung TEXT,
gwoe_matrix TEXT, -- JSON array
gwoe_schwerpunkt TEXT, -- JSON array
wahlprogramm_scores TEXT, -- JSON array
verbesserungen TEXT, -- JSON array
staerken TEXT, -- JSON array
schwaechen TEXT, -- JSON array
empfehlung TEXT,
empfehlung_symbol TEXT,
verbesserungspotenzial TEXT,
themen TEXT, -- JSON array
antrag_zusammenfassung TEXT,
antrag_kernpunkte TEXT, -- JSON array
source TEXT DEFAULT 'batch', -- 'batch' or 'webapp'
model TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
await db.commit()
async def create_job(
job_id: str,
input_preview: str,
bundesland: str = "NRW",
model: str = "qwen-plus",
user_id: Optional[str] = None,
) -> dict:
"""Create a new analysis job."""
now = datetime.utcnow().isoformat()
async with aiosqlite.connect(settings.db_path) as db:
await db.execute(
"""
INSERT INTO jobs (id, input_preview, bundesland, model, user_id, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(job_id, input_preview, bundesland, model, user_id, now, now),
)
await db.commit()
return {"id": job_id, "status": "queued", "created_at": now}
async def get_job(job_id: str) -> Optional[dict]:
"""Get job by ID."""
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,))
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def update_job(job_id: str, **kwargs) -> bool:
"""Update job fields."""
if not kwargs:
return False
kwargs["updated_at"] = datetime.utcnow().isoformat()
fields = ", ".join(f"{k} = ?" for k in kwargs.keys())
values = list(kwargs.values()) + [job_id]
async with aiosqlite.connect(settings.db_path) as db:
await db.execute(f"UPDATE jobs SET {fields} WHERE id = ?", values)
await db.commit()
return True
async def get_user_jobs(user_id: str, limit: int = 50) -> list[dict]:
"""Get jobs for a user (for history page)."""
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC LIMIT ?",
(user_id, limit),
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def upsert_assessment(data: dict) -> bool:
"""Insert or update an assessment."""
import json
now = datetime.utcnow().isoformat()
async with aiosqlite.connect(settings.db_path) as db:
await db.execute("""
INSERT INTO assessments (
drucksache, title, fraktionen, datum, link, bundesland,
gwoe_score, gwoe_begruendung, gwoe_matrix, gwoe_schwerpunkt,
wahlprogramm_scores, verbesserungen, staerken, schwaechen,
empfehlung, empfehlung_symbol, verbesserungspotenzial,
themen, antrag_zusammenfassung, antrag_kernpunkte,
source, model, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(drucksache) DO UPDATE SET
title = excluded.title,
gwoe_score = excluded.gwoe_score,
gwoe_begruendung = excluded.gwoe_begruendung,
gwoe_matrix = excluded.gwoe_matrix,
updated_at = excluded.updated_at
""", (
data.get("drucksache"),
data.get("title"),
json.dumps(data.get("fraktionen", [])),
data.get("datum"),
data.get("link"),
data.get("bundesland", "NRW"),
data.get("gwoeScore"),
data.get("gwoeBegründung"),
json.dumps(data.get("gwoeMatrix", [])),
json.dumps(data.get("gwoeSchwerpunkt", [])),
json.dumps(data.get("wahlprogrammScores", [])),
json.dumps(data.get("verbesserungen", [])),
json.dumps(data.get("stärken", [])),
json.dumps(data.get("schwächen", [])),
data.get("empfehlung"),
data.get("empfehlungSymbol"),
data.get("verbesserungspotenzial"),
json.dumps(data.get("themen", [])),
data.get("antragZusammenfassung"),
json.dumps(data.get("antragKernpunkte", [])),
data.get("source", "webapp"),
data.get("model"),
now, now
))
await db.commit()
return True
async def get_assessment(drucksache: str) -> Optional[dict]:
"""Get assessment by drucksache ID."""
import json
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM assessments WHERE drucksache = ?", (drucksache,)
)
row = await cursor.fetchone()
if row:
d = dict(row)
# Parse JSON fields
for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt",
"wahlprogramm_scores", "verbesserungen", "staerken",
"schwaechen", "themen", "antrag_kernpunkte"]:
if d.get(field):
try:
d[field] = json.loads(d[field])
except:
pass
return d
return None
async def delete_assessment(drucksache: str) -> bool:
"""Delete an assessment by drucksache ID. Used by the cite-endpoint
to trigger re-analysis of Pre-#60 hallucinated assessments."""
async with aiosqlite.connect(settings.db_path) as db:
cursor = await db.execute(
"DELETE FROM assessments WHERE drucksache = ?", (drucksache,)
)
await db.commit()
return cursor.rowcount > 0
async def get_all_assessments(bundesland: str = None) -> list[dict]:
"""Get all assessments from database, optionally filtered by Bundesland.
The special value ``"ALL"`` and ``None`` mean no filter — both behave
identically and return every row. Any other value becomes a strict
``WHERE bundesland = ?`` match.
"""
import json
sql = "SELECT * FROM assessments"
params: list = []
if bundesland and bundesland != "ALL":
sql += " WHERE bundesland = ?"
params.append(bundesland)
sql += " ORDER BY gwoe_score DESC"
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(sql, params)
rows = await cursor.fetchall()
results = []
for row in rows:
d = dict(row)
# Parse JSON fields
for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt",
"wahlprogramm_scores", "verbesserungen", "staerken",
"schwaechen", "themen", "antrag_kernpunkte"]:
if d.get(field):
try:
d[field] = json.loads(d[field])
except:
pass
results.append(d)
return results
async def import_json_assessments(assessments_dir):
"""Import assessments from JSON files into database."""
import json
from pathlib import Path
dir_path = Path(assessments_dir)
if not dir_path.exists():
return 0
count = 0
for f in dir_path.glob("*.json"):
try:
data = json.loads(f.read_text())
data["source"] = "batch"
await upsert_assessment(data)
count += 1
except Exception as e:
print(f"Error importing {f}: {e}")
return count
def _parse_search_query(query: str) -> tuple[list[str], bool]:
"""
Parse search query for AND logic and exact phrases.
Returns: (terms, is_exact)
Examples:
- 'Klimaschutz Energie' -> (['klimaschutz', 'energie'], False)
- '"Grüner Stahl"' -> (['grüner stahl'], True)
"""
query = query.strip()
# Check for exact phrase (entire query in quotes)
if query.startswith('"') and query.endswith('"') and query.count('"') == 2:
exact = query[1:-1].strip()
return ([exact.lower()], True)
# Extract quoted phrases and regular terms
import logging
import shlex
try:
parts = shlex.split(query)
except ValueError as e:
# Unbalanced quote — fall back to whitespace split, but log so we
# notice patterns of malformed queries (Issue #57 Befund #7).
logging.getLogger(__name__).warning(
"shlex.split failed on search query (%s), falling back to whitespace split", e
)
parts = query.split()
return ([p.lower() for p in parts], False)
async def search_assessments(query: str, bundesland: str = None, limit: int = 50) -> list[dict]:
"""Search assessments by title, drucksache, or themen. Supports AND logic."""
import json
terms, is_exact = _parse_search_query(query)
# Build SQL for first term (to narrow down results)
first_term = terms[0] if terms else query.lower()
sql = """
SELECT * FROM assessments
WHERE (
LOWER(drucksache) LIKE ?
OR LOWER(title) LIKE ?
OR LOWER(themen) LIKE ?
OR LOWER(fraktionen) LIKE ?
)
"""
params = [f"%{first_term}%"] * 4
if bundesland and bundesland != "ALL":
sql += " AND bundesland = ?"
params.append(bundesland)
sql += " ORDER BY gwoe_score DESC LIMIT ?"
params.append(limit)
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(sql, params)
rows = await cursor.fetchall()
results = []
for row in rows:
d = dict(row)
for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt",
"wahlprogramm_scores", "verbesserungen", "staerken",
"schwaechen", "themen", "antrag_kernpunkte"]:
if d.get(field):
try:
d[field] = json.loads(d[field])
except:
pass
# Apply AND filter for multiple terms
if len(terms) > 1 or is_exact:
searchable = f"{d.get('title', '')} {d.get('drucksache', '')} {' '.join(d.get('fraktionen', []))} {' '.join(d.get('themen', []))}".lower()
if is_exact:
if terms[0] not in searchable:
continue
else:
if not all(term in searchable for term in terms):
continue
results.append(d)
return results