FIFO-Queue für Analyse-Jobs — ersetzt FastAPI BackgroundTasks: app/queue.py: - asyncio.Queue mit MAX_QUEUE_SIZE=50 - Einzelner Worker-Coroutine (Concurrency=1, DashScope-freundlich) - MIN_PAUSE_SECONDS=10 zwischen Jobs - Exponentielles Backoff bei Serien-Fehlern (15s → 5min) - get_queue_status() für den Status-Endpoint - QueueFullError → HTTP 429 + Retry-After Header - start_worker() als FastAPI-Startup-Task - re_enqueue_pending() markiert Crash-Überlebende als 'stale' main.py: - POST /api/analyze-drucksache nutzt queue.enqueue() statt background_tasks.add_task() - Response enthält queue_position - GET /api/queue/status zeigt pending, max_size, processed, estimated_wait_seconds, worker_running - Worker wird bei app.startup() gestartet Tests: 201 passed, 5 skipped. Refs: #95, #44 (Batch baut auf Queue auf)
166 lines
5.7 KiB
Python
166 lines
5.7 KiB
Python
"""Analysis job queue with single-worker processing (#95).
|
|
|
|
Implements a FIFO queue backed by the existing ``jobs`` SQLite table
|
|
with an in-memory ``asyncio.Queue`` for the worker loop. On startup,
|
|
any ``status='queued'`` jobs from a previous run are re-enqueued.
|
|
|
|
Architecture:
|
|
- ``enqueue(job_id, callback, *args)`` → puts a job on the queue
|
|
- A single worker coroutine processes jobs sequentially (concurrency=1)
|
|
with a configurable pause between calls (default 10s, DashScope-friendly)
|
|
- ``get_queue_status()`` returns position count + estimated wait
|
|
- ``MAX_QUEUE_SIZE`` limits backpressure → HTTP 429 when full
|
|
|
|
The worker is started via ``start_worker()`` which should be called
|
|
from the FastAPI startup event.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from typing import Any, Callable, Coroutine, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Konfiguration
|
|
MAX_QUEUE_SIZE = 50 # Maximale Queue-Tiefe bevor 429 zurückgegeben wird
|
|
MIN_PAUSE_SECONDS = 10 # Mindest-Pause zwischen LLM-Calls (DashScope Rate-Limit)
|
|
BACKOFF_BASE = 15 # Exponentielles Backoff bei Fehlern (Sekunden)
|
|
BACKOFF_MAX = 300 # Maximales Backoff (5 Minuten)
|
|
|
|
# In-Memory Queue
|
|
_queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
|
|
_worker_task: Optional[asyncio.Task] = None
|
|
_stats = {
|
|
"processed": 0,
|
|
"failed": 0,
|
|
"started_at": None,
|
|
"last_completed_at": None,
|
|
"avg_duration": 60.0, # Initialer Schätzwert: 60s pro Job
|
|
}
|
|
|
|
|
|
class QueueFullError(Exception):
|
|
"""Raised when the queue is at capacity."""
|
|
pass
|
|
|
|
|
|
async def enqueue(
|
|
job_id: str,
|
|
callback: Callable[..., Coroutine],
|
|
*args: Any,
|
|
**kwargs: Any,
|
|
) -> int:
|
|
"""Add a job to the queue. Returns the queue position (1-indexed).
|
|
|
|
Raises ``QueueFullError`` if the queue is at capacity.
|
|
"""
|
|
try:
|
|
_queue.put_nowait((job_id, callback, args, kwargs))
|
|
except asyncio.QueueFull:
|
|
raise QueueFullError(
|
|
f"Queue voll ({MAX_QUEUE_SIZE} Jobs). Bitte später erneut versuchen."
|
|
)
|
|
position = _queue.qsize()
|
|
logger.info("Job %s enqueued at position %d", job_id, position)
|
|
return position
|
|
|
|
|
|
def get_queue_status() -> dict:
|
|
"""Current queue status for the /api/queue/status endpoint."""
|
|
pending = _queue.qsize()
|
|
avg = _stats["avg_duration"]
|
|
estimated_wait = pending * (avg + MIN_PAUSE_SECONDS)
|
|
return {
|
|
"pending": pending,
|
|
"max_size": MAX_QUEUE_SIZE,
|
|
"processed_total": _stats["processed"],
|
|
"failed_total": _stats["failed"],
|
|
"estimated_wait_seconds": round(estimated_wait),
|
|
"avg_job_duration_seconds": round(avg, 1),
|
|
"worker_running": _worker_task is not None and not _worker_task.done(),
|
|
}
|
|
|
|
|
|
async def _worker():
|
|
"""Single worker coroutine — processes jobs sequentially."""
|
|
logger.info("Queue worker started (max_queue=%d, pause=%ds)", MAX_QUEUE_SIZE, MIN_PAUSE_SECONDS)
|
|
_stats["started_at"] = time.time()
|
|
consecutive_failures = 0
|
|
|
|
while True:
|
|
job_id, callback, args, kwargs = await _queue.get()
|
|
t0 = time.time()
|
|
|
|
try:
|
|
logger.info("Processing job %s (queue remaining: %d)", job_id, _queue.qsize())
|
|
await callback(*args, **kwargs)
|
|
duration = time.time() - t0
|
|
_stats["processed"] += 1
|
|
_stats["last_completed_at"] = time.time()
|
|
# Gleitender Durchschnitt der Job-Dauer
|
|
_stats["avg_duration"] = (_stats["avg_duration"] * 0.8) + (duration * 0.2)
|
|
consecutive_failures = 0
|
|
logger.info("Job %s completed in %.1fs", job_id, duration)
|
|
|
|
except Exception:
|
|
_stats["failed"] += 1
|
|
consecutive_failures += 1
|
|
logger.exception("Job %s failed", job_id)
|
|
|
|
# Exponentielles Backoff bei Serien-Fehlern (z.B. LLM-Throttling)
|
|
if consecutive_failures > 1:
|
|
backoff = min(BACKOFF_BASE * (2 ** (consecutive_failures - 2)), BACKOFF_MAX)
|
|
logger.warning("Backoff %ds after %d consecutive failures", backoff, consecutive_failures)
|
|
await asyncio.sleep(backoff)
|
|
|
|
finally:
|
|
_queue.task_done()
|
|
|
|
# Mindest-Pause zwischen Jobs (DashScope-freundlich)
|
|
await asyncio.sleep(MIN_PAUSE_SECONDS)
|
|
|
|
|
|
def start_worker() -> asyncio.Task:
|
|
"""Start the worker coroutine. Call from FastAPI startup event."""
|
|
global _worker_task
|
|
if _worker_task is not None and not _worker_task.done():
|
|
logger.warning("Worker already running, not starting a second one")
|
|
return _worker_task
|
|
_worker_task = asyncio.create_task(_worker())
|
|
return _worker_task
|
|
|
|
|
|
async def re_enqueue_pending():
|
|
"""Re-enqueue jobs with status='queued' from a previous run.
|
|
|
|
Called at startup to recover from container restarts. Only picks up
|
|
jobs that were queued but never started processing.
|
|
"""
|
|
import aiosqlite
|
|
from .config import settings
|
|
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
rows = await db.execute(
|
|
"SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at"
|
|
)
|
|
queued = await rows.fetchall()
|
|
|
|
if not queued:
|
|
return
|
|
|
|
logger.info("Re-enqueueing %d pending jobs from previous run", len(queued))
|
|
# We can't re-enqueue with the original callback here because we don't
|
|
# know the original parameters. Mark them as 'stale' instead — the user
|
|
# can re-trigger via the UI.
|
|
import aiosqlite
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
for row in queued:
|
|
await db.execute(
|
|
"UPDATE jobs SET status = 'stale', updated_at = datetime('now') WHERE id = ?",
|
|
(row["id"],),
|
|
)
|
|
await db.commit()
|
|
logger.info("Marked %d stale jobs (user must re-trigger)", len(queued))
|