Statt eine Nachricht "Textstelle nicht auffindbar" zu zeigen (was User zurecht als Quatsch bezeichnet hat), erkennt der Cite-Endpoint jetzt halluzinierte Zitate und triggert automatisch eine Re-Analyse: Flow: 1. User klickt auf Zitat-Link 2. render_highlighted_page gibt (pdf, page, highlighted=False) zurück 3. Endpoint prüft: ds+bl Parameter vorhanden? Assessment in DB? 4. → Löscht altes Assessment, startet Re-Analyse als Background-Task 5. → Zeigt HTML-Warte-Seite mit Spinner und "Wird neu analysiert..." 6. → Auto-Redirect nach 15s zurück zum Assessment Das neue Assessment hat durch reconstruct_zitate verifizierte Zitate, die dann beim nächsten Klick korrekt gehighlighted werden. Änderungen: - embeddings.render_highlighted_page: Return-Typ (bytes, int, bool) — drittes Element ist True wenn Highlight gesetzt wurde - database.delete_assessment: neue Funktion für die Re-Analyse - main.py cite-Endpoint: akzeptiert ds= und bl= als optionale Params, triggert Re-Analyse bei highlighted=False + ds vorhanden - Frontend: makeCiteUrl reicht ds+bl aus dem Assessment-Kontext mit durch in die Cite-URL - Cache-Control auf 1h reduziert (war 24h, zu aggressiv für Assessments die sich durch Re-Analyse ändern) Tests: 194/194 grün. Refs: #47, #60
360 lines
12 KiB
Python
360 lines
12 KiB
Python
"""SQLite database for job tracking."""
|
|
|
|
import aiosqlite
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from .config import settings
|
|
|
|
|
|
async def init_db():
|
|
"""Initialize database with tables."""
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
await db.execute("""
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id TEXT PRIMARY KEY,
|
|
status TEXT NOT NULL DEFAULT 'queued',
|
|
input_preview TEXT,
|
|
bundesland TEXT DEFAULT 'NRW',
|
|
model TEXT DEFAULT 'qwen-plus',
|
|
result TEXT,
|
|
html_path TEXT,
|
|
pdf_path TEXT,
|
|
error TEXT,
|
|
user_id TEXT,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
|
|
# Assessments table for pre-computed + new analyses
|
|
await db.execute("""
|
|
CREATE TABLE IF NOT EXISTS assessments (
|
|
drucksache TEXT PRIMARY KEY,
|
|
title TEXT,
|
|
fraktionen TEXT, -- JSON array
|
|
datum TEXT,
|
|
link TEXT,
|
|
bundesland TEXT DEFAULT 'NRW',
|
|
|
|
gwoe_score REAL,
|
|
gwoe_begruendung TEXT,
|
|
gwoe_matrix TEXT, -- JSON array
|
|
gwoe_schwerpunkt TEXT, -- JSON array
|
|
|
|
wahlprogramm_scores TEXT, -- JSON array
|
|
|
|
verbesserungen TEXT, -- JSON array
|
|
staerken TEXT, -- JSON array
|
|
schwaechen TEXT, -- JSON array
|
|
|
|
empfehlung TEXT,
|
|
empfehlung_symbol TEXT,
|
|
verbesserungspotenzial TEXT,
|
|
|
|
themen TEXT, -- JSON array
|
|
antrag_zusammenfassung TEXT,
|
|
antrag_kernpunkte TEXT, -- JSON array
|
|
|
|
source TEXT DEFAULT 'batch', -- 'batch' or 'webapp'
|
|
model TEXT,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
|
|
await db.commit()
|
|
|
|
|
|
async def create_job(
|
|
job_id: str,
|
|
input_preview: str,
|
|
bundesland: str = "NRW",
|
|
model: str = "qwen-plus",
|
|
user_id: Optional[str] = None,
|
|
) -> dict:
|
|
"""Create a new analysis job."""
|
|
now = datetime.utcnow().isoformat()
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO jobs (id, input_preview, bundesland, model, user_id, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(job_id, input_preview, bundesland, model, user_id, now, now),
|
|
)
|
|
await db.commit()
|
|
return {"id": job_id, "status": "queued", "created_at": now}
|
|
|
|
|
|
async def get_job(job_id: str) -> Optional[dict]:
|
|
"""Get job by ID."""
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,))
|
|
row = await cursor.fetchone()
|
|
if row:
|
|
return dict(row)
|
|
return None
|
|
|
|
|
|
async def update_job(job_id: str, **kwargs) -> bool:
|
|
"""Update job fields."""
|
|
if not kwargs:
|
|
return False
|
|
|
|
kwargs["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
fields = ", ".join(f"{k} = ?" for k in kwargs.keys())
|
|
values = list(kwargs.values()) + [job_id]
|
|
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
await db.execute(f"UPDATE jobs SET {fields} WHERE id = ?", values)
|
|
await db.commit()
|
|
return True
|
|
|
|
|
|
async def get_user_jobs(user_id: str, limit: int = 50) -> list[dict]:
|
|
"""Get jobs for a user (for history page)."""
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC LIMIT ?",
|
|
(user_id, limit),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
async def upsert_assessment(data: dict) -> bool:
|
|
"""Insert or update an assessment."""
|
|
import json
|
|
now = datetime.utcnow().isoformat()
|
|
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
await db.execute("""
|
|
INSERT INTO assessments (
|
|
drucksache, title, fraktionen, datum, link, bundesland,
|
|
gwoe_score, gwoe_begruendung, gwoe_matrix, gwoe_schwerpunkt,
|
|
wahlprogramm_scores, verbesserungen, staerken, schwaechen,
|
|
empfehlung, empfehlung_symbol, verbesserungspotenzial,
|
|
themen, antrag_zusammenfassung, antrag_kernpunkte,
|
|
source, model, created_at, updated_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(drucksache) DO UPDATE SET
|
|
title = excluded.title,
|
|
gwoe_score = excluded.gwoe_score,
|
|
gwoe_begruendung = excluded.gwoe_begruendung,
|
|
gwoe_matrix = excluded.gwoe_matrix,
|
|
updated_at = excluded.updated_at
|
|
""", (
|
|
data.get("drucksache"),
|
|
data.get("title"),
|
|
json.dumps(data.get("fraktionen", [])),
|
|
data.get("datum"),
|
|
data.get("link"),
|
|
data.get("bundesland", "NRW"),
|
|
data.get("gwoeScore"),
|
|
data.get("gwoeBegründung"),
|
|
json.dumps(data.get("gwoeMatrix", [])),
|
|
json.dumps(data.get("gwoeSchwerpunkt", [])),
|
|
json.dumps(data.get("wahlprogrammScores", [])),
|
|
json.dumps(data.get("verbesserungen", [])),
|
|
json.dumps(data.get("stärken", [])),
|
|
json.dumps(data.get("schwächen", [])),
|
|
data.get("empfehlung"),
|
|
data.get("empfehlungSymbol"),
|
|
data.get("verbesserungspotenzial"),
|
|
json.dumps(data.get("themen", [])),
|
|
data.get("antragZusammenfassung"),
|
|
json.dumps(data.get("antragKernpunkte", [])),
|
|
data.get("source", "webapp"),
|
|
data.get("model"),
|
|
now, now
|
|
))
|
|
await db.commit()
|
|
return True
|
|
|
|
|
|
async def get_assessment(drucksache: str) -> Optional[dict]:
|
|
"""Get assessment by drucksache ID."""
|
|
import json
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT * FROM assessments WHERE drucksache = ?", (drucksache,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
if row:
|
|
d = dict(row)
|
|
# Parse JSON fields
|
|
for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt",
|
|
"wahlprogramm_scores", "verbesserungen", "staerken",
|
|
"schwaechen", "themen", "antrag_kernpunkte"]:
|
|
if d.get(field):
|
|
try:
|
|
d[field] = json.loads(d[field])
|
|
except:
|
|
pass
|
|
return d
|
|
return None
|
|
|
|
|
|
async def delete_assessment(drucksache: str) -> bool:
|
|
"""Delete an assessment by drucksache ID. Used by the cite-endpoint
|
|
to trigger re-analysis of Pre-#60 hallucinated assessments."""
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
"DELETE FROM assessments WHERE drucksache = ?", (drucksache,)
|
|
)
|
|
await db.commit()
|
|
return cursor.rowcount > 0
|
|
|
|
|
|
async def get_all_assessments(bundesland: str = None) -> list[dict]:
|
|
"""Get all assessments from database, optionally filtered by Bundesland.
|
|
|
|
The special value ``"ALL"`` and ``None`` mean no filter — both behave
|
|
identically and return every row. Any other value becomes a strict
|
|
``WHERE bundesland = ?`` match.
|
|
"""
|
|
import json
|
|
sql = "SELECT * FROM assessments"
|
|
params: list = []
|
|
if bundesland and bundesland != "ALL":
|
|
sql += " WHERE bundesland = ?"
|
|
params.append(bundesland)
|
|
sql += " ORDER BY gwoe_score DESC"
|
|
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(sql, params)
|
|
rows = await cursor.fetchall()
|
|
results = []
|
|
for row in rows:
|
|
d = dict(row)
|
|
# Parse JSON fields
|
|
for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt",
|
|
"wahlprogramm_scores", "verbesserungen", "staerken",
|
|
"schwaechen", "themen", "antrag_kernpunkte"]:
|
|
if d.get(field):
|
|
try:
|
|
d[field] = json.loads(d[field])
|
|
except:
|
|
pass
|
|
results.append(d)
|
|
return results
|
|
|
|
|
|
async def import_json_assessments(assessments_dir):
|
|
"""Import assessments from JSON files into database."""
|
|
import json
|
|
from pathlib import Path
|
|
|
|
dir_path = Path(assessments_dir)
|
|
if not dir_path.exists():
|
|
return 0
|
|
|
|
count = 0
|
|
for f in dir_path.glob("*.json"):
|
|
try:
|
|
data = json.loads(f.read_text())
|
|
data["source"] = "batch"
|
|
await upsert_assessment(data)
|
|
count += 1
|
|
except Exception as e:
|
|
print(f"Error importing {f}: {e}")
|
|
|
|
return count
|
|
|
|
|
|
def _parse_search_query(query: str) -> tuple[list[str], bool]:
|
|
"""
|
|
Parse search query for AND logic and exact phrases.
|
|
Returns: (terms, is_exact)
|
|
|
|
Examples:
|
|
- 'Klimaschutz Energie' -> (['klimaschutz', 'energie'], False)
|
|
- '"Grüner Stahl"' -> (['grüner stahl'], True)
|
|
"""
|
|
query = query.strip()
|
|
|
|
# Check for exact phrase (entire query in quotes)
|
|
if query.startswith('"') and query.endswith('"') and query.count('"') == 2:
|
|
exact = query[1:-1].strip()
|
|
return ([exact.lower()], True)
|
|
|
|
# Extract quoted phrases and regular terms
|
|
import logging
|
|
import shlex
|
|
try:
|
|
parts = shlex.split(query)
|
|
except ValueError as e:
|
|
# Unbalanced quote — fall back to whitespace split, but log so we
|
|
# notice patterns of malformed queries (Issue #57 Befund #7).
|
|
logging.getLogger(__name__).warning(
|
|
"shlex.split failed on search query (%s), falling back to whitespace split", e
|
|
)
|
|
parts = query.split()
|
|
|
|
return ([p.lower() for p in parts], False)
|
|
|
|
|
|
async def search_assessments(query: str, bundesland: str = None, limit: int = 50) -> list[dict]:
|
|
"""Search assessments by title, drucksache, or themen. Supports AND logic."""
|
|
import json
|
|
|
|
terms, is_exact = _parse_search_query(query)
|
|
|
|
# Build SQL for first term (to narrow down results)
|
|
first_term = terms[0] if terms else query.lower()
|
|
|
|
sql = """
|
|
SELECT * FROM assessments
|
|
WHERE (
|
|
LOWER(drucksache) LIKE ?
|
|
OR LOWER(title) LIKE ?
|
|
OR LOWER(themen) LIKE ?
|
|
OR LOWER(fraktionen) LIKE ?
|
|
)
|
|
"""
|
|
params = [f"%{first_term}%"] * 4
|
|
|
|
if bundesland and bundesland != "ALL":
|
|
sql += " AND bundesland = ?"
|
|
params.append(bundesland)
|
|
|
|
sql += " ORDER BY gwoe_score DESC LIMIT ?"
|
|
params.append(limit)
|
|
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(sql, params)
|
|
rows = await cursor.fetchall()
|
|
|
|
results = []
|
|
for row in rows:
|
|
d = dict(row)
|
|
for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt",
|
|
"wahlprogramm_scores", "verbesserungen", "staerken",
|
|
"schwaechen", "themen", "antrag_kernpunkte"]:
|
|
if d.get(field):
|
|
try:
|
|
d[field] = json.loads(d[field])
|
|
except:
|
|
pass
|
|
|
|
# Apply AND filter for multiple terms
|
|
if len(terms) > 1 or is_exact:
|
|
searchable = f"{d.get('title', '')} {d.get('drucksache', '')} {' '.join(d.get('fraktionen', []))} {' '.join(d.get('themen', []))}".lower()
|
|
if is_exact:
|
|
if terms[0] not in searchable:
|
|
continue
|
|
else:
|
|
if not all(term in searchable for term in terms):
|
|
continue
|
|
|
|
results.append(d)
|
|
|
|
return results
|