"""Analysis job queue with single-worker processing (#95). Implements a FIFO queue backed by the existing ``jobs`` SQLite table with an in-memory ``asyncio.Queue`` for the worker loop. On startup, any ``status='queued'`` jobs from a previous run are re-enqueued. Architecture: - ``enqueue(job_id, callback, *args)`` → puts a job on the queue - A single worker coroutine processes jobs sequentially (concurrency=1) with a configurable pause between calls (default 10s, DashScope-friendly) - ``get_queue_status()`` returns position count + estimated wait - ``MAX_QUEUE_SIZE`` limits backpressure → HTTP 429 when full The worker is started via ``start_worker()`` which should be called from the FastAPI startup event. """ import asyncio import logging import time from typing import Any, Callable, Coroutine, Optional logger = logging.getLogger(__name__) # Konfiguration MAX_QUEUE_SIZE = 50 # Maximale Queue-Tiefe bevor 429 zurückgegeben wird MIN_PAUSE_SECONDS = 10 # Mindest-Pause zwischen LLM-Calls (DashScope Rate-Limit) BACKOFF_BASE = 15 # Exponentielles Backoff bei Fehlern (Sekunden) BACKOFF_MAX = 300 # Maximales Backoff (5 Minuten) # In-Memory Queue _queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) _worker_task: Optional[asyncio.Task] = None _stats = { "processed": 0, "failed": 0, "started_at": None, "last_completed_at": None, "avg_duration": 60.0, # Initialer Schätzwert: 60s pro Job } class QueueFullError(Exception): """Raised when the queue is at capacity.""" pass async def enqueue( job_id: str, callback: Callable[..., Coroutine], *args: Any, **kwargs: Any, ) -> int: """Add a job to the queue. Returns the queue position (1-indexed). Raises ``QueueFullError`` if the queue is at capacity. """ try: _queue.put_nowait((job_id, callback, args, kwargs)) except asyncio.QueueFull: raise QueueFullError( f"Queue voll ({MAX_QUEUE_SIZE} Jobs). Bitte später erneut versuchen." ) position = _queue.qsize() logger.info("Job %s enqueued at position %d", job_id, position) return position def get_queue_status() -> dict: """Current queue status for the /api/queue/status endpoint.""" pending = _queue.qsize() avg = _stats["avg_duration"] estimated_wait = pending * (avg + MIN_PAUSE_SECONDS) return { "pending": pending, "max_size": MAX_QUEUE_SIZE, "processed_total": _stats["processed"], "failed_total": _stats["failed"], "estimated_wait_seconds": round(estimated_wait), "avg_job_duration_seconds": round(avg, 1), "worker_running": _worker_task is not None and not _worker_task.done(), } async def _worker(): """Single worker coroutine — processes jobs sequentially.""" logger.info("Queue worker started (max_queue=%d, pause=%ds)", MAX_QUEUE_SIZE, MIN_PAUSE_SECONDS) _stats["started_at"] = time.time() consecutive_failures = 0 while True: job_id, callback, args, kwargs = await _queue.get() t0 = time.time() try: logger.info("Processing job %s (queue remaining: %d)", job_id, _queue.qsize()) await callback(*args, **kwargs) duration = time.time() - t0 _stats["processed"] += 1 _stats["last_completed_at"] = time.time() # Gleitender Durchschnitt der Job-Dauer _stats["avg_duration"] = (_stats["avg_duration"] * 0.8) + (duration * 0.2) consecutive_failures = 0 logger.info("Job %s completed in %.1fs", job_id, duration) except Exception: _stats["failed"] += 1 consecutive_failures += 1 logger.exception("Job %s failed", job_id) # Exponentielles Backoff bei Serien-Fehlern (z.B. LLM-Throttling) if consecutive_failures > 1: backoff = min(BACKOFF_BASE * (2 ** (consecutive_failures - 2)), BACKOFF_MAX) logger.warning("Backoff %ds after %d consecutive failures", backoff, consecutive_failures) await asyncio.sleep(backoff) finally: _queue.task_done() # Mindest-Pause zwischen Jobs (DashScope-freundlich) await asyncio.sleep(MIN_PAUSE_SECONDS) def start_worker() -> asyncio.Task: """Start the worker coroutine. Call from FastAPI startup event.""" global _worker_task if _worker_task is not None and not _worker_task.done(): logger.warning("Worker already running, not starting a second one") return _worker_task _worker_task = asyncio.create_task(_worker()) return _worker_task async def re_enqueue_pending(): """Re-enqueue jobs with status='queued' from a previous run. Called at startup to recover from container restarts. Only picks up jobs that were queued but never started processing. """ import aiosqlite from .config import settings async with aiosqlite.connect(settings.db_path) as db: db.row_factory = aiosqlite.Row rows = await db.execute( "SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at" ) queued = await rows.fetchall() if not queued: return logger.info("Re-enqueueing %d pending jobs from previous run", len(queued)) # We can't re-enqueue with the original callback here because we don't # know the original parameters. Mark them as 'stale' instead — the user # can re-trigger via the UI. import aiosqlite async with aiosqlite.connect(settings.db_path) as db: for row in queued: await db.execute( "UPDATE jobs SET status = 'stale', updated_at = datetime('now') WHERE id = ?", (row["id"],), ) await db.commit() logger.info("Marked %d stale jobs (user must re-trigger)", len(queued))