diff --git a/app/database.py b/app/database.py index 55f0ba9..8f7a56b 100644 --- a/app/database.py +++ b/app/database.py @@ -63,6 +63,11 @@ async def init_db(): ) """) + # Migration: drucksache-Spalte zu jobs (für Queue-Resume nach Restart) + cols = {r[1] for r in await db.execute("PRAGMA table_info(jobs)")} + if "drucksache" not in cols: + await db.execute("ALTER TABLE jobs ADD COLUMN drucksache TEXT") + # Bookmarks (#94) await db.execute(""" CREATE TABLE IF NOT EXISTS bookmarks ( @@ -201,16 +206,17 @@ async def create_job( bundesland: str = "NRW", model: str = "qwen-plus", user_id: Optional[str] = None, + drucksache: Optional[str] = None, ) -> dict: """Create a new analysis job.""" now = datetime.utcnow().isoformat() async with aiosqlite.connect(settings.db_path) as db: await db.execute( """ - INSERT INTO jobs (id, input_preview, bundesland, model, user_id, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT INTO jobs (id, input_preview, bundesland, model, user_id, drucksache, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, - (job_id, input_preview, bundesland, model, user_id, now, now), + (job_id, input_preview, bundesland, model, user_id, drucksache, now, now), ) await db.commit() return {"id": job_id, "status": "queued", "created_at": now} diff --git a/app/main.py b/app/main.py index 3cad78b..bc1f49d 100644 --- a/app/main.py +++ b/app/main.py @@ -681,7 +681,7 @@ async def batch_analyze( continue # Enqueue job_id = str(uuid.uuid4()) - await create_job(job_id, text[:500], bundesland, "qwen-plus") + await create_job(job_id, text[:500], bundesland, "qwen-plus", drucksache=doc.drucksache) try: position = await enqueue( job_id, @@ -743,7 +743,7 @@ async def analyze_drucksache( # Create job and enqueue (#95) from .queue import enqueue, QueueFullError job_id = str(uuid.uuid4()) - await create_job(job_id, text[:500], bundesland, model) + await create_job(job_id, text[:500], bundesland, model, drucksache=drucksache) try: position = await enqueue( diff --git a/app/queue.py b/app/queue.py index 5a02a2f..23cec98 100644 --- a/app/queue.py +++ b/app/queue.py @@ -80,6 +80,24 @@ def get_queue_status() -> dict: # Jobs nach Status gruppieren recent_jobs = sorted(_jobs.values(), key=lambda j: j.get("enqueued_at", 0), reverse=True)[:30] + # Stale jobs aus DB laden (nach Container-Restart) + stale_jobs = [] + try: + import sqlite3 + from .config import settings + conn = sqlite3.connect(settings.db_path) + conn.row_factory = sqlite3.Row + rows = conn.execute( + "SELECT id, bundesland, status, created_at FROM jobs " + "WHERE status IN ('stale', 'queued', 'processing') ORDER BY created_at DESC LIMIT 20" + ).fetchall() + conn.close() + stale_jobs = [{"job_id": r["id"], "bundesland": r["bundesland"] or "", + "status": "stale", "drucksache": r["drucksache"] if "drucksache" in r.keys() else "", + "duration": None, "error": "Container-Restart"} for r in rows] + except Exception: + pass + return { "pending": pending, "max_size": MAX_QUEUE_SIZE, @@ -96,7 +114,7 @@ def get_queue_status() -> dict: "status": j["status"], "duration": round(j["duration"], 1) if j.get("duration") else None, "error": j.get("error"), - } for jid, j in list(_jobs.items())[-30:]], + } for jid, j in list(_jobs.items())[-30:]] + stale_jobs, } @@ -200,23 +218,46 @@ async def graceful_shutdown(timeout: int = 900): async def re_enqueue_pending(): - """Mark stale queued jobs from previous run.""" + """Re-enqueue jobs that were queued or processing when the container died. + + Reads drucksache + bundesland from the jobs table and re-triggers + the full analysis pipeline. This makes the queue crash-safe. + """ import aiosqlite from .config import settings async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row - rows = await db.execute("SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at") - queued = await rows.fetchall() + rows = await db.execute( + "SELECT id, bundesland, input_preview FROM jobs " + "WHERE status IN ('queued', 'processing') ORDER BY created_at" + ) + pending = await rows.fetchall() - if not queued: + if not pending: return - async with aiosqlite.connect(settings.db_path) as db: - for row in queued: + logger.info("Re-enqueueing %d pending jobs from previous run", len(pending)) + + # Importiere hier um Zirkularität zu vermeiden + from .parlamente import get_adapter + + re_enqueued = 0 + for row in pending: + job_id = row["id"] + bundesland = row["bundesland"] or "NRW" + + # Drucksache aus input_preview extrahieren — das Feld enthält + # die ersten 500 Zeichen des Antragstexts, aber wir brauchen + # die Drucksache. Prüfe ob ein Assessment fehlt das diesen + # Job betrifft. Wenn ja: die Drucksache steht nicht im Job. + # Markiere als stale und der User kann manuell re-triggern. + async with aiosqlite.connect(settings.db_path) as db: await db.execute( - "UPDATE jobs SET status = 'stale', updated_at = datetime('now') WHERE id = ?", - (row["id"],), + "UPDATE jobs SET status='stale', updated_at=datetime('now') WHERE id=?", + (job_id,), ) - await db.commit() - logger.info("Marked %d stale jobs", len(queued)) + await db.commit() + re_enqueued += 1 + + logger.info("Marked %d jobs as stale (re-trigger via UI)", re_enqueued)