diff --git a/tests/test_queue.py b/tests/test_queue.py index 2feeb0c..09eda55 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -66,3 +66,113 @@ class TestGetQueueStatus: status = get_queue_status() assert status["pending"] == 2 assert status["estimated_wait_seconds"] > 0 + + +# ─── Coverage-Backfill (#134) — Worker, Shutdown, Re-Enqueue ───────────────── + + +class TestStartWorker: + @pytest.mark.asyncio + async def test_creates_tasks_for_concurrency(self): + """start_worker erzeugt CONCURRENCY viele Tasks.""" + from app import queue as q + # Reset _worker_tasks + q._worker_tasks.clear() + try: + tasks = q.start_worker() + assert len(tasks) == q.CONCURRENCY + assert all(t is not None for t in tasks) + finally: + # Cleanup: cancel + clear + for t in q._worker_tasks: + t.cancel() + q._worker_tasks.clear() + + @pytest.mark.asyncio + async def test_does_not_replace_running_workers(self): + """Wenn start_worker zweimal aufgerufen wird, werden lebende Tasks + nicht durch neue ersetzt.""" + from app import queue as q + q._worker_tasks.clear() + try: + first = q.start_worker() + first_ids = [id(t) for t in first] + second = q.start_worker() + second_ids = [id(t) for t in second] + # Tasks bleiben dieselben Instanzen + assert first_ids == second_ids + finally: + for t in q._worker_tasks: + t.cancel() + q._worker_tasks.clear() + + +class TestGracefulShutdown: + @pytest.mark.asyncio + async def test_no_processing_jobs_returns_immediately(self): + """Mit leerem _jobs-State sollte graceful_shutdown sofort + zurueckkehren.""" + from app import queue as q + q._jobs.clear() + # set _shutting_down zurueck + q._shutting_down = False + try: + import time + t0 = time.time() + await q.graceful_shutdown(timeout=5) + assert time.time() - t0 < 1.0 # Sofort + assert q._shutting_down is True + finally: + q._shutting_down = False + + @pytest.mark.asyncio + async def test_waits_for_processing_jobs(self): + """Mit einem 'processing'-Job wartet shutdown bis er fertig ist.""" + from app import queue as q + import asyncio as _asyncio + q._jobs.clear() + q._jobs["job1"] = {"status": "processing"} + q._shutting_down = False + + async def finish_job_after_delay(): + await _asyncio.sleep(0.05) + q._jobs["job1"]["status"] = "completed" + + try: + await _asyncio.gather( + q.graceful_shutdown(timeout=5), + finish_job_after_delay(), + ) + assert q._shutting_down is True + finally: + q._jobs.clear() + q._shutting_down = False + + @pytest.mark.asyncio + async def test_timeout_logs_remaining(self, caplog): + """Wenn Job nach Timeout noch processing ist, wird ERROR geloggt.""" + import logging + from app import queue as q + q._jobs.clear() + q._jobs["stuck"] = {"status": "processing"} + q._shutting_down = False + + try: + with caplog.at_level(logging.ERROR, logger="app.queue"): + await q.graceful_shutdown(timeout=1) + assert any("Timeout" in r.message for r in caplog.records) + finally: + q._jobs.clear() + q._shutting_down = False + + +class TestEnqueueShuttingDown: + @pytest.mark.asyncio + async def test_enqueue_blocked_during_shutdown(self): + from app import queue as q + q._shutting_down = True + try: + with pytest.raises(q.QueueFullError, match="Server wird neu gestartet"): + await q.enqueue("job-x", lambda: None) + finally: + q._shutting_down = False