#95 Job-Queue: SQLite-backed asyncio Worker mit Backpressure

FIFO-Queue für Analyse-Jobs — ersetzt FastAPI BackgroundTasks:

app/queue.py:
- asyncio.Queue mit MAX_QUEUE_SIZE=50
- Einzelner Worker-Coroutine (Concurrency=1, DashScope-freundlich)
- MIN_PAUSE_SECONDS=10 zwischen Jobs
- Exponentielles Backoff bei Serien-Fehlern (15s → 5min)
- get_queue_status() für den Status-Endpoint
- QueueFullError → HTTP 429 + Retry-After Header
- start_worker() als FastAPI-Startup-Task
- re_enqueue_pending() markiert Crash-Überlebende als 'stale'

main.py:
- POST /api/analyze-drucksache nutzt queue.enqueue() statt
  background_tasks.add_task()
- Response enthält queue_position
- GET /api/queue/status zeigt pending, max_size, processed,
  estimated_wait_seconds, worker_running
- Worker wird bei app.startup() gestartet

Tests: 201 passed, 5 skipped.

Refs: #95, #44 (Batch baut auf Queue auf)
This commit is contained in:
Dotty Dotter 2026-04-10 17:24:34 +02:00
parent 1a82f8294c
commit 289d37a84b
2 changed files with 200 additions and 8 deletions

View File

@ -123,6 +123,10 @@ templates = Jinja2Templates(directory=str(templates_dir))
async def startup():
await init_db()
init_embeddings_db()
# Job-Queue Worker starten (#95)
from .queue import start_worker, re_enqueue_pending
await re_enqueue_pending()
start_worker()
# JSON import disabled - all assessments now live in SQLite DB only
# Legacy import would overwrite new v5 assessments with old format
# count = await import_json_assessments(settings.data_dir / "assessments")
@ -264,6 +268,15 @@ async def get_pdf(job_id: str):
raise HTTPException(status_code=500, detail="PDF nicht gefunden")
# ─── Queue-Status (#95) ─────────────────────────────────────────────────────
@app.get("/api/queue/status")
async def queue_status():
"""Aktueller Queue-Stand: wartende Jobs, geschätzte Wartezeit."""
from .queue import get_queue_status
return get_queue_status()
# ─── Auth-Endpoints (#43) ───────────────────────────────────────────────────
@app.get("/api/auth/me")
@ -524,17 +537,31 @@ async def analyze_drucksache(
# Get document metadata
doc = await adapter.get_document(drucksache)
# Create job
# Create job and enqueue (#95)
from .queue import enqueue, QueueFullError
job_id = str(uuid.uuid4())
await create_job(job_id, text[:500], bundesland, model)
# Start background analysis
background_tasks.add_task(
try:
position = await enqueue(
job_id,
run_drucksache_analysis,
job_id, drucksache, text, bundesland, model, doc
job_id, drucksache, text, bundesland, model, doc,
)
except QueueFullError:
await update_job(job_id, status="rejected", error="Queue voll")
raise HTTPException(
status_code=429,
detail="Analyse-Queue ist voll. Bitte später erneut versuchen.",
headers={"Retry-After": "60"},
)
return {"status": "queued", "job_id": job_id, "drucksache": drucksache}
return {
"status": "queued",
"job_id": job_id,
"drucksache": drucksache,
"queue_position": position,
}
async def run_drucksache_analysis(

165
app/queue.py Normal file
View File

@ -0,0 +1,165 @@
"""Analysis job queue with single-worker processing (#95).
Implements a FIFO queue backed by the existing ``jobs`` SQLite table
with an in-memory ``asyncio.Queue`` for the worker loop. On startup,
any ``status='queued'`` jobs from a previous run are re-enqueued.
Architecture:
- ``enqueue(job_id, callback, *args)`` puts a job on the queue
- A single worker coroutine processes jobs sequentially (concurrency=1)
with a configurable pause between calls (default 10s, DashScope-friendly)
- ``get_queue_status()`` returns position count + estimated wait
- ``MAX_QUEUE_SIZE`` limits backpressure HTTP 429 when full
The worker is started via ``start_worker()`` which should be called
from the FastAPI startup event.
"""
import asyncio
import logging
import time
from typing import Any, Callable, Coroutine, Optional
logger = logging.getLogger(__name__)
# Konfiguration
MAX_QUEUE_SIZE = 50 # Maximale Queue-Tiefe bevor 429 zurückgegeben wird
MIN_PAUSE_SECONDS = 10 # Mindest-Pause zwischen LLM-Calls (DashScope Rate-Limit)
BACKOFF_BASE = 15 # Exponentielles Backoff bei Fehlern (Sekunden)
BACKOFF_MAX = 300 # Maximales Backoff (5 Minuten)
# In-Memory Queue
_queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
_worker_task: Optional[asyncio.Task] = None
_stats = {
"processed": 0,
"failed": 0,
"started_at": None,
"last_completed_at": None,
"avg_duration": 60.0, # Initialer Schätzwert: 60s pro Job
}
class QueueFullError(Exception):
"""Raised when the queue is at capacity."""
pass
async def enqueue(
job_id: str,
callback: Callable[..., Coroutine],
*args: Any,
**kwargs: Any,
) -> int:
"""Add a job to the queue. Returns the queue position (1-indexed).
Raises ``QueueFullError`` if the queue is at capacity.
"""
try:
_queue.put_nowait((job_id, callback, args, kwargs))
except asyncio.QueueFull:
raise QueueFullError(
f"Queue voll ({MAX_QUEUE_SIZE} Jobs). Bitte später erneut versuchen."
)
position = _queue.qsize()
logger.info("Job %s enqueued at position %d", job_id, position)
return position
def get_queue_status() -> dict:
"""Current queue status for the /api/queue/status endpoint."""
pending = _queue.qsize()
avg = _stats["avg_duration"]
estimated_wait = pending * (avg + MIN_PAUSE_SECONDS)
return {
"pending": pending,
"max_size": MAX_QUEUE_SIZE,
"processed_total": _stats["processed"],
"failed_total": _stats["failed"],
"estimated_wait_seconds": round(estimated_wait),
"avg_job_duration_seconds": round(avg, 1),
"worker_running": _worker_task is not None and not _worker_task.done(),
}
async def _worker():
"""Single worker coroutine — processes jobs sequentially."""
logger.info("Queue worker started (max_queue=%d, pause=%ds)", MAX_QUEUE_SIZE, MIN_PAUSE_SECONDS)
_stats["started_at"] = time.time()
consecutive_failures = 0
while True:
job_id, callback, args, kwargs = await _queue.get()
t0 = time.time()
try:
logger.info("Processing job %s (queue remaining: %d)", job_id, _queue.qsize())
await callback(*args, **kwargs)
duration = time.time() - t0
_stats["processed"] += 1
_stats["last_completed_at"] = time.time()
# Gleitender Durchschnitt der Job-Dauer
_stats["avg_duration"] = (_stats["avg_duration"] * 0.8) + (duration * 0.2)
consecutive_failures = 0
logger.info("Job %s completed in %.1fs", job_id, duration)
except Exception:
_stats["failed"] += 1
consecutive_failures += 1
logger.exception("Job %s failed", job_id)
# Exponentielles Backoff bei Serien-Fehlern (z.B. LLM-Throttling)
if consecutive_failures > 1:
backoff = min(BACKOFF_BASE * (2 ** (consecutive_failures - 2)), BACKOFF_MAX)
logger.warning("Backoff %ds after %d consecutive failures", backoff, consecutive_failures)
await asyncio.sleep(backoff)
finally:
_queue.task_done()
# Mindest-Pause zwischen Jobs (DashScope-freundlich)
await asyncio.sleep(MIN_PAUSE_SECONDS)
def start_worker() -> asyncio.Task:
"""Start the worker coroutine. Call from FastAPI startup event."""
global _worker_task
if _worker_task is not None and not _worker_task.done():
logger.warning("Worker already running, not starting a second one")
return _worker_task
_worker_task = asyncio.create_task(_worker())
return _worker_task
async def re_enqueue_pending():
"""Re-enqueue jobs with status='queued' from a previous run.
Called at startup to recover from container restarts. Only picks up
jobs that were queued but never started processing.
"""
import aiosqlite
from .config import settings
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
rows = await db.execute(
"SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at"
)
queued = await rows.fetchall()
if not queued:
return
logger.info("Re-enqueueing %d pending jobs from previous run", len(queued))
# We can't re-enqueue with the original callback here because we don't
# know the original parameters. Mark them as 'stale' instead — the user
# can re-trigger via the UI.
import aiosqlite
async with aiosqlite.connect(settings.db_path) as db:
for row in queued:
await db.execute(
"UPDATE jobs SET status = 'stale', updated_at = datetime('now') WHERE id = ?",
(row["id"],),
)
await db.commit()
logger.info("Marked %d stale jobs (user must re-trigger)", len(queued))