gwoe-antragspruefer/tests/test_queue.py

69 lines
1.7 KiB
Python
Raw Permalink Normal View History

"""Tests for app/queue.py — Job queue with single-worker processing (#95)."""
import asyncio
import pytest
from app.queue import (
enqueue,
get_queue_status,
QueueFullError,
_queue,
_stats,
)
@pytest.fixture(autouse=True)
def clean_queue():
"""Drain the queue before each test."""
while not _queue.empty():
try:
_queue.get_nowait()
except asyncio.QueueEmpty:
break
_stats["processed"] = 0
_stats["failed"] = 0
yield
class TestEnqueue:
@pytest.mark.asyncio
async def test_enqueue_returns_position(self):
async def noop():
pass
pos = await enqueue("j1", noop)
assert pos == 1
@pytest.mark.asyncio
async def test_enqueue_increments_position(self):
async def noop():
pass
await enqueue("j1", noop)
pos2 = await enqueue("j2", noop)
assert pos2 == 2
@pytest.mark.asyncio
async def test_queue_full_raises(self):
async def noop():
pass
# Fill queue to capacity
for i in range(50):
await enqueue(f"fill-{i}", noop)
with pytest.raises(QueueFullError):
await enqueue("overflow", noop)
class TestGetQueueStatus:
def test_empty_queue_status(self):
status = get_queue_status()
assert status["pending"] == 0
assert status["max_size"] == 50
assert status["estimated_wait_seconds"] == 0
@pytest.mark.asyncio
async def test_status_reflects_pending(self):
async def noop():
pass
await enqueue("j1", noop)
await enqueue("j2", noop)
status = get_queue_status()
assert status["pending"] == 2
assert status["estimated_wait_seconds"] > 0