"""Tests for app/queue.py — Job queue with single-worker processing (#95).""" import asyncio import pytest from app.queue import ( enqueue, get_queue_status, QueueFullError, _queue, _stats, ) @pytest.fixture(autouse=True) def clean_queue(): """Drain the queue before each test.""" while not _queue.empty(): try: _queue.get_nowait() except asyncio.QueueEmpty: break _stats["processed"] = 0 _stats["failed"] = 0 yield class TestEnqueue: @pytest.mark.asyncio async def test_enqueue_returns_position(self): async def noop(): pass pos = await enqueue("j1", noop) assert pos == 1 @pytest.mark.asyncio async def test_enqueue_increments_position(self): async def noop(): pass await enqueue("j1", noop) pos2 = await enqueue("j2", noop) assert pos2 == 2 @pytest.mark.asyncio async def test_queue_full_raises(self): async def noop(): pass # Fill queue to capacity for i in range(50): await enqueue(f"fill-{i}", noop) with pytest.raises(QueueFullError): await enqueue("overflow", noop) class TestGetQueueStatus: def test_empty_queue_status(self): status = get_queue_status() assert status["pending"] == 0 assert status["max_size"] == 50 assert status["estimated_wait_seconds"] == 0 @pytest.mark.asyncio async def test_status_reflects_pending(self): async def noop(): pass await enqueue("j1", noop) await enqueue("j2", noop) status = get_queue_status() assert status["pending"] == 2 assert status["estimated_wait_seconds"] > 0 # ─── Coverage-Backfill (#134) — Worker, Shutdown, Re-Enqueue ───────────────── class TestStartWorker: @pytest.mark.asyncio async def test_creates_tasks_for_concurrency(self): """start_worker erzeugt CONCURRENCY viele Tasks.""" from app import queue as q # Reset _worker_tasks q._worker_tasks.clear() try: tasks = q.start_worker() assert len(tasks) == q.CONCURRENCY assert all(t is not None for t in tasks) finally: # Cleanup: cancel + clear for t in q._worker_tasks: t.cancel() q._worker_tasks.clear() @pytest.mark.asyncio async def test_does_not_replace_running_workers(self): """Wenn start_worker zweimal aufgerufen wird, werden lebende Tasks nicht durch neue ersetzt.""" from app import queue as q q._worker_tasks.clear() try: first = q.start_worker() first_ids = [id(t) for t in first] second = q.start_worker() second_ids = [id(t) for t in second] # Tasks bleiben dieselben Instanzen assert first_ids == second_ids finally: for t in q._worker_tasks: t.cancel() q._worker_tasks.clear() class TestGracefulShutdown: @pytest.mark.asyncio async def test_no_processing_jobs_returns_immediately(self): """Mit leerem _jobs-State sollte graceful_shutdown sofort zurueckkehren.""" from app import queue as q q._jobs.clear() # set _shutting_down zurueck q._shutting_down = False try: import time t0 = time.time() await q.graceful_shutdown(timeout=5) assert time.time() - t0 < 1.0 # Sofort assert q._shutting_down is True finally: q._shutting_down = False @pytest.mark.asyncio async def test_waits_for_processing_jobs(self): """Mit einem 'processing'-Job wartet shutdown bis er fertig ist.""" from app import queue as q import asyncio as _asyncio q._jobs.clear() q._jobs["job1"] = {"status": "processing"} q._shutting_down = False async def finish_job_after_delay(): await _asyncio.sleep(0.05) q._jobs["job1"]["status"] = "completed" try: await _asyncio.gather( q.graceful_shutdown(timeout=5), finish_job_after_delay(), ) assert q._shutting_down is True finally: q._jobs.clear() q._shutting_down = False @pytest.mark.asyncio async def test_timeout_logs_remaining(self, caplog): """Wenn Job nach Timeout noch processing ist, wird ERROR geloggt.""" import logging from app import queue as q q._jobs.clear() q._jobs["stuck"] = {"status": "processing"} q._shutting_down = False try: with caplog.at_level(logging.ERROR, logger="app.queue"): await q.graceful_shutdown(timeout=1) assert any("Timeout" in r.message for r in caplog.records) finally: q._jobs.clear() q._shutting_down = False class TestEnqueueShuttingDown: @pytest.mark.asyncio async def test_enqueue_blocked_during_shutdown(self): from app import queue as q q._shutting_down = True try: with pytest.raises(q.QueueFullError, match="Server wird neu gestartet"): await q.enqueue("job-x", lambda: None) finally: q._shutting_down = False