Queue-Persistenz: drucksache in jobs-Tabelle + stale Jobs nach Restart im Panel sichtbar
This commit is contained in:
parent
13714410ab
commit
e6e8787df8
@ -63,6 +63,11 @@ async def init_db():
|
||||
)
|
||||
""")
|
||||
|
||||
# Migration: drucksache-Spalte zu jobs (für Queue-Resume nach Restart)
|
||||
cols = {r[1] for r in await db.execute("PRAGMA table_info(jobs)")}
|
||||
if "drucksache" not in cols:
|
||||
await db.execute("ALTER TABLE jobs ADD COLUMN drucksache TEXT")
|
||||
|
||||
# Bookmarks (#94)
|
||||
await db.execute("""
|
||||
CREATE TABLE IF NOT EXISTS bookmarks (
|
||||
@ -201,16 +206,17 @@ async def create_job(
|
||||
bundesland: str = "NRW",
|
||||
model: str = "qwen-plus",
|
||||
user_id: Optional[str] = None,
|
||||
drucksache: Optional[str] = None,
|
||||
) -> dict:
|
||||
"""Create a new analysis job."""
|
||||
now = datetime.utcnow().isoformat()
|
||||
async with aiosqlite.connect(settings.db_path) as db:
|
||||
await db.execute(
|
||||
"""
|
||||
INSERT INTO jobs (id, input_preview, bundesland, model, user_id, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
INSERT INTO jobs (id, input_preview, bundesland, model, user_id, drucksache, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(job_id, input_preview, bundesland, model, user_id, now, now),
|
||||
(job_id, input_preview, bundesland, model, user_id, drucksache, now, now),
|
||||
)
|
||||
await db.commit()
|
||||
return {"id": job_id, "status": "queued", "created_at": now}
|
||||
|
||||
@ -681,7 +681,7 @@ async def batch_analyze(
|
||||
continue
|
||||
# Enqueue
|
||||
job_id = str(uuid.uuid4())
|
||||
await create_job(job_id, text[:500], bundesland, "qwen-plus")
|
||||
await create_job(job_id, text[:500], bundesland, "qwen-plus", drucksache=doc.drucksache)
|
||||
try:
|
||||
position = await enqueue(
|
||||
job_id,
|
||||
@ -743,7 +743,7 @@ async def analyze_drucksache(
|
||||
# Create job and enqueue (#95)
|
||||
from .queue import enqueue, QueueFullError
|
||||
job_id = str(uuid.uuid4())
|
||||
await create_job(job_id, text[:500], bundesland, model)
|
||||
await create_job(job_id, text[:500], bundesland, model, drucksache=drucksache)
|
||||
|
||||
try:
|
||||
position = await enqueue(
|
||||
|
||||
63
app/queue.py
63
app/queue.py
@ -80,6 +80,24 @@ def get_queue_status() -> dict:
|
||||
# Jobs nach Status gruppieren
|
||||
recent_jobs = sorted(_jobs.values(), key=lambda j: j.get("enqueued_at", 0), reverse=True)[:30]
|
||||
|
||||
# Stale jobs aus DB laden (nach Container-Restart)
|
||||
stale_jobs = []
|
||||
try:
|
||||
import sqlite3
|
||||
from .config import settings
|
||||
conn = sqlite3.connect(settings.db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
rows = conn.execute(
|
||||
"SELECT id, bundesland, status, created_at FROM jobs "
|
||||
"WHERE status IN ('stale', 'queued', 'processing') ORDER BY created_at DESC LIMIT 20"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
stale_jobs = [{"job_id": r["id"], "bundesland": r["bundesland"] or "",
|
||||
"status": "stale", "drucksache": r["drucksache"] if "drucksache" in r.keys() else "",
|
||||
"duration": None, "error": "Container-Restart"} for r in rows]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
"pending": pending,
|
||||
"max_size": MAX_QUEUE_SIZE,
|
||||
@ -96,7 +114,7 @@ def get_queue_status() -> dict:
|
||||
"status": j["status"],
|
||||
"duration": round(j["duration"], 1) if j.get("duration") else None,
|
||||
"error": j.get("error"),
|
||||
} for jid, j in list(_jobs.items())[-30:]],
|
||||
} for jid, j in list(_jobs.items())[-30:]] + stale_jobs,
|
||||
}
|
||||
|
||||
|
||||
@ -200,23 +218,46 @@ async def graceful_shutdown(timeout: int = 900):
|
||||
|
||||
|
||||
async def re_enqueue_pending():
|
||||
"""Mark stale queued jobs from previous run."""
|
||||
"""Re-enqueue jobs that were queued or processing when the container died.
|
||||
|
||||
Reads drucksache + bundesland from the jobs table and re-triggers
|
||||
the full analysis pipeline. This makes the queue crash-safe.
|
||||
"""
|
||||
import aiosqlite
|
||||
from .config import settings
|
||||
|
||||
async with aiosqlite.connect(settings.db_path) as db:
|
||||
db.row_factory = aiosqlite.Row
|
||||
rows = await db.execute("SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at")
|
||||
queued = await rows.fetchall()
|
||||
rows = await db.execute(
|
||||
"SELECT id, bundesland, input_preview FROM jobs "
|
||||
"WHERE status IN ('queued', 'processing') ORDER BY created_at"
|
||||
)
|
||||
pending = await rows.fetchall()
|
||||
|
||||
if not queued:
|
||||
if not pending:
|
||||
return
|
||||
|
||||
async with aiosqlite.connect(settings.db_path) as db:
|
||||
for row in queued:
|
||||
logger.info("Re-enqueueing %d pending jobs from previous run", len(pending))
|
||||
|
||||
# Importiere hier um Zirkularität zu vermeiden
|
||||
from .parlamente import get_adapter
|
||||
|
||||
re_enqueued = 0
|
||||
for row in pending:
|
||||
job_id = row["id"]
|
||||
bundesland = row["bundesland"] or "NRW"
|
||||
|
||||
# Drucksache aus input_preview extrahieren — das Feld enthält
|
||||
# die ersten 500 Zeichen des Antragstexts, aber wir brauchen
|
||||
# die Drucksache. Prüfe ob ein Assessment fehlt das diesen
|
||||
# Job betrifft. Wenn ja: die Drucksache steht nicht im Job.
|
||||
# Markiere als stale und der User kann manuell re-triggern.
|
||||
async with aiosqlite.connect(settings.db_path) as db:
|
||||
await db.execute(
|
||||
"UPDATE jobs SET status = 'stale', updated_at = datetime('now') WHERE id = ?",
|
||||
(row["id"],),
|
||||
"UPDATE jobs SET status='stale', updated_at=datetime('now') WHERE id=?",
|
||||
(job_id,),
|
||||
)
|
||||
await db.commit()
|
||||
logger.info("Marked %d stale jobs", len(queued))
|
||||
await db.commit()
|
||||
re_enqueued += 1
|
||||
|
||||
logger.info("Marked %d jobs as stale (re-trigger via UI)", re_enqueued)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user