diff --git a/app/main.py b/app/main.py index 2a4fe4d..e326e8a 100644 --- a/app/main.py +++ b/app/main.py @@ -134,7 +134,7 @@ async def startup(): async def shutdown(): """Graceful Shutdown: warte auf laufende Queue-Jobs bevor der Container stirbt.""" from .queue import graceful_shutdown - await graceful_shutdown(timeout=300) + await graceful_shutdown(timeout=900) # 15 min — passend zu stop_grace_period # JSON import disabled - all assessments now live in SQLite DB only diff --git a/app/queue.py b/app/queue.py index 482e618..5a02a2f 100644 --- a/app/queue.py +++ b/app/queue.py @@ -15,7 +15,8 @@ logger = logging.getLogger(__name__) # Konfiguration MAX_QUEUE_SIZE = 50 CONCURRENCY = int(os.environ.get("QUEUE_CONCURRENCY", "3")) -MIN_PAUSE_SECONDS = 3 # Pause pro Worker zwischen Jobs +MIN_PAUSE_SECONDS = 3 +_shutting_down = False # Sperrt neue Jobs bei Graceful Shutdown BACKOFF_BASE = 15 BACKOFF_MAX = 300 @@ -45,6 +46,8 @@ async def enqueue( **kwargs: Any, ) -> int: """Add a job to the queue. Returns queue position.""" + if _shutting_down: + raise QueueFullError("Server wird neu gestartet. Bitte in Kürze erneut versuchen.") try: _queue.put_nowait((job_id, callback, args, kwargs)) except asyncio.QueueFull: @@ -81,6 +84,7 @@ def get_queue_status() -> dict: "pending": pending, "max_size": MAX_QUEUE_SIZE, "concurrency": CONCURRENCY, + "shutting_down": _shutting_down, "processed_total": _stats["processed"], "failed_total": _stats["failed"], "estimated_wait_seconds": round(estimated_wait), @@ -158,25 +162,41 @@ def start_worker() -> list[asyncio.Task]: return _worker_tasks -async def graceful_shutdown(timeout: int = 300): - """Wait for running jobs to finish before shutdown. +async def graceful_shutdown(timeout: int = 900): + """Graceful Shutdown: aktuell laufende Jobs beenden, Queue sperren. - Called from FastAPI shutdown event. Waits up to `timeout` seconds - for the queue to drain and workers to finish their current job. + 1. Sperrt neue Jobs (_shutting_down = True) + 2. Wartet bis alle gerade PROCESSING-Jobs fertig sind (max timeout) + 3. Queued-Jobs bleiben in der DB als 'stale' → User kann nach + Restart erneut triggern + + Timeout 15 min (900s) — ein einzelner LLM-Call dauert max ~120s, + bei 3 parallelen Workern also max ~120s reale Wartezeit. """ - pending = _queue.qsize() + global _shutting_down + _shutting_down = True + processing = sum(1 for j in _jobs.values() if j.get("status") == "processing") - if pending == 0 and processing == 0: - logger.info("Queue empty, shutdown immediately") + pending = _queue.qsize() + + if processing == 0: + logger.info("Graceful shutdown: keine laufenden Jobs, sofort beenden (%d queued verworfen)", pending) return - logger.warning("Graceful shutdown: waiting for %d pending + %d processing jobs (max %ds)", - pending, processing, timeout) - try: - await asyncio.wait_for(_queue.join(), timeout=timeout) - logger.info("Queue drained, shutdown clean") - except asyncio.TimeoutError: - logger.error("Graceful shutdown timeout after %ds, %d jobs still pending", timeout, _queue.qsize()) + logger.warning("Graceful shutdown: warte auf %d laufende Jobs (max %ds). %d queued werden beim Restart stale.", + processing, timeout, pending) + + # Warte nur auf die laufenden Jobs, nicht auf die ganze Queue + start = time.time() + while time.time() - start < timeout: + still_processing = sum(1 for j in _jobs.values() if j.get("status") == "processing") + if still_processing == 0: + logger.info("Graceful shutdown: alle laufenden Jobs beendet nach %.0fs", time.time() - start) + return + await asyncio.sleep(2) + + logger.error("Graceful shutdown: Timeout nach %ds, %d Jobs noch aktiv", + timeout, sum(1 for j in _jobs.values() if j.get("status") == "processing")) async def re_enqueue_pending(): diff --git a/docker-compose.yml b/docker-compose.yml index 257f01b..8df5731 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ services: build: . container_name: gwoe-antragspruefer restart: unless-stopped - stop_grace_period: 5m # Queue-Jobs zu Ende laufen lassen vor Kill + stop_grace_period: 15m # Laufende LLM-Jobs zu Ende laufen lassen environment: - DASHSCOPE_API_KEY=${DASHSCOPE_API_KEY} - KEYCLOAK_URL=https://sso.toppyr.de