gwoe-antragspruefer/app/queue.py
Dotty Dotter 289d37a84b #95 Job-Queue: SQLite-backed asyncio Worker mit Backpressure
FIFO-Queue für Analyse-Jobs — ersetzt FastAPI BackgroundTasks:

app/queue.py:
- asyncio.Queue mit MAX_QUEUE_SIZE=50
- Einzelner Worker-Coroutine (Concurrency=1, DashScope-freundlich)
- MIN_PAUSE_SECONDS=10 zwischen Jobs
- Exponentielles Backoff bei Serien-Fehlern (15s → 5min)
- get_queue_status() für den Status-Endpoint
- QueueFullError → HTTP 429 + Retry-After Header
- start_worker() als FastAPI-Startup-Task
- re_enqueue_pending() markiert Crash-Überlebende als 'stale'

main.py:
- POST /api/analyze-drucksache nutzt queue.enqueue() statt
  background_tasks.add_task()
- Response enthält queue_position
- GET /api/queue/status zeigt pending, max_size, processed,
  estimated_wait_seconds, worker_running
- Worker wird bei app.startup() gestartet

Tests: 201 passed, 5 skipped.

Refs: #95, #44 (Batch baut auf Queue auf)
2026-04-10 17:24:34 +02:00

166 lines
5.7 KiB
Python

"""Analysis job queue with single-worker processing (#95).
Implements a FIFO queue backed by the existing ``jobs`` SQLite table
with an in-memory ``asyncio.Queue`` for the worker loop. On startup,
any ``status='queued'`` jobs from a previous run are re-enqueued.
Architecture:
- ``enqueue(job_id, callback, *args)`` → puts a job on the queue
- A single worker coroutine processes jobs sequentially (concurrency=1)
with a configurable pause between calls (default 10s, DashScope-friendly)
- ``get_queue_status()`` returns position count + estimated wait
- ``MAX_QUEUE_SIZE`` limits backpressure → HTTP 429 when full
The worker is started via ``start_worker()`` which should be called
from the FastAPI startup event.
"""
import asyncio
import logging
import time
from typing import Any, Callable, Coroutine, Optional
logger = logging.getLogger(__name__)
# Konfiguration
MAX_QUEUE_SIZE = 50 # Maximale Queue-Tiefe bevor 429 zurückgegeben wird
MIN_PAUSE_SECONDS = 10 # Mindest-Pause zwischen LLM-Calls (DashScope Rate-Limit)
BACKOFF_BASE = 15 # Exponentielles Backoff bei Fehlern (Sekunden)
BACKOFF_MAX = 300 # Maximales Backoff (5 Minuten)
# In-Memory Queue
_queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
_worker_task: Optional[asyncio.Task] = None
_stats = {
"processed": 0,
"failed": 0,
"started_at": None,
"last_completed_at": None,
"avg_duration": 60.0, # Initialer Schätzwert: 60s pro Job
}
class QueueFullError(Exception):
"""Raised when the queue is at capacity."""
pass
async def enqueue(
job_id: str,
callback: Callable[..., Coroutine],
*args: Any,
**kwargs: Any,
) -> int:
"""Add a job to the queue. Returns the queue position (1-indexed).
Raises ``QueueFullError`` if the queue is at capacity.
"""
try:
_queue.put_nowait((job_id, callback, args, kwargs))
except asyncio.QueueFull:
raise QueueFullError(
f"Queue voll ({MAX_QUEUE_SIZE} Jobs). Bitte später erneut versuchen."
)
position = _queue.qsize()
logger.info("Job %s enqueued at position %d", job_id, position)
return position
def get_queue_status() -> dict:
"""Current queue status for the /api/queue/status endpoint."""
pending = _queue.qsize()
avg = _stats["avg_duration"]
estimated_wait = pending * (avg + MIN_PAUSE_SECONDS)
return {
"pending": pending,
"max_size": MAX_QUEUE_SIZE,
"processed_total": _stats["processed"],
"failed_total": _stats["failed"],
"estimated_wait_seconds": round(estimated_wait),
"avg_job_duration_seconds": round(avg, 1),
"worker_running": _worker_task is not None and not _worker_task.done(),
}
async def _worker():
"""Single worker coroutine — processes jobs sequentially."""
logger.info("Queue worker started (max_queue=%d, pause=%ds)", MAX_QUEUE_SIZE, MIN_PAUSE_SECONDS)
_stats["started_at"] = time.time()
consecutive_failures = 0
while True:
job_id, callback, args, kwargs = await _queue.get()
t0 = time.time()
try:
logger.info("Processing job %s (queue remaining: %d)", job_id, _queue.qsize())
await callback(*args, **kwargs)
duration = time.time() - t0
_stats["processed"] += 1
_stats["last_completed_at"] = time.time()
# Gleitender Durchschnitt der Job-Dauer
_stats["avg_duration"] = (_stats["avg_duration"] * 0.8) + (duration * 0.2)
consecutive_failures = 0
logger.info("Job %s completed in %.1fs", job_id, duration)
except Exception:
_stats["failed"] += 1
consecutive_failures += 1
logger.exception("Job %s failed", job_id)
# Exponentielles Backoff bei Serien-Fehlern (z.B. LLM-Throttling)
if consecutive_failures > 1:
backoff = min(BACKOFF_BASE * (2 ** (consecutive_failures - 2)), BACKOFF_MAX)
logger.warning("Backoff %ds after %d consecutive failures", backoff, consecutive_failures)
await asyncio.sleep(backoff)
finally:
_queue.task_done()
# Mindest-Pause zwischen Jobs (DashScope-freundlich)
await asyncio.sleep(MIN_PAUSE_SECONDS)
def start_worker() -> asyncio.Task:
"""Start the worker coroutine. Call from FastAPI startup event."""
global _worker_task
if _worker_task is not None and not _worker_task.done():
logger.warning("Worker already running, not starting a second one")
return _worker_task
_worker_task = asyncio.create_task(_worker())
return _worker_task
async def re_enqueue_pending():
"""Re-enqueue jobs with status='queued' from a previous run.
Called at startup to recover from container restarts. Only picks up
jobs that were queued but never started processing.
"""
import aiosqlite
from .config import settings
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
rows = await db.execute(
"SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at"
)
queued = await rows.fetchall()
if not queued:
return
logger.info("Re-enqueueing %d pending jobs from previous run", len(queued))
# We can't re-enqueue with the original callback here because we don't
# know the original parameters. Mark them as 'stale' instead — the user
# can re-trigger via the UI.
import aiosqlite
async with aiosqlite.connect(settings.db_path) as db:
for row in queued:
await db.execute(
"UPDATE jobs SET status = 'stale', updated_at = datetime('now') WHERE id = ?",
(row["id"],),
)
await db.commit()
logger.info("Marked %d stale jobs (user must re-trigger)", len(queued))