#99 Queue: 3 parallele Worker + Job-Visualisierung + Admin-Schutz

Queue (queue.py):
- QUEUE_CONCURRENCY ENV (default 3) statt hartcodiert 1
- N Worker-Coroutines via asyncio tasks (nicht Semaphore — jeder
  Worker pickt eigenständig von der Queue)
- Per-Job-Tracking: job_id → {status, drucksache, duration, error}
- get_queue_status() liefert jobs-Array für UI-Tabelle

Visualisierung (index.html):
- Fortschrittsbalken (X/Y fertig, grün)
- Job-Tabelle: Drucksache + Status-Icon + Dauer
- Fertige Jobs klickbar → Detail-Ansicht
- Auto-Refresh alle 3s

Admin-Schutz (auth.py + main.py):
- Neue require_admin Dependency: prüft Keycloak-Rolle "admin" oder
  "gwoe-admin". Im Dev-Modus durchlassen.
- Batch-Analyse, Programme-Index, Assessment-Delete: require_admin
- Einzelanalyse, Bookmarks, Kommentare: bleiben require_auth
- Keycloak: Rolle "admin" erstellt + User tobias zugewiesen

Tests: 206 passed.

Refs: #99
This commit is contained in:
Dotty Dotter 2026-04-10 23:15:42 +02:00
parent 5f5d9edf83
commit d24949740b
4 changed files with 147 additions and 81 deletions

View File

@ -205,6 +205,29 @@ async def require_auth(request: Request) -> dict:
return user
async def require_admin(request: Request) -> dict:
"""Admin-Auth — gibt User-Dict oder HTTP 403.
Prüft ob der User die Rolle 'admin' oder 'gwoe-admin' hat.
Im Dev-Modus (Auth deaktiviert): durchlassen.
Für: Batch-Analyse, Programm-Indexierung, Assessment-Löschung.
"""
if not _is_auth_enabled():
return {"sub": "anonymous", "email": "", "name": "Dev-Modus", "roles": ["admin"]}
user = await require_auth(request)
roles = user.get("roles", [])
if "admin" in roles or "gwoe-admin" in roles:
return user
raise HTTPException(
status_code=403,
detail="Admin-Berechtigung erforderlich",
)
return user
# ─────────────────────────────────────────────────────────────────────────────
# Auth-Info-Endpoint
# ─────────────────────────────────────────────────────────────────────────────

View File

@ -40,7 +40,7 @@ from .database import (
from .parlamente import get_adapter, ADAPTERS
from .bundeslaender import alle_bundeslaender
from .analyzer import analyze_antrag
from .auth import get_current_user, require_auth, keycloak_login_url, _is_auth_enabled
from .auth import get_current_user, require_auth, require_admin, keycloak_login_url, _is_auth_enabled
def _pick_best_title(llm_title: str, doc_title: Optional[str], drucksache: str) -> str:
@ -487,7 +487,7 @@ async def get_single_assessment(drucksache: str):
@app.delete("/api/assessment/delete")
async def delete_assessment_endpoint(
drucksache: str,
user: dict = Depends(require_auth),
user: dict = Depends(require_admin),
):
"""Löscht ein Assessment, damit es neu analysiert werden kann."""
drucksache = validate_drucksache(drucksache)
@ -635,7 +635,7 @@ async def batch_analyze(
request: Request,
bundesland: str = Form(...),
limit: int = Form(10),
user: dict = Depends(require_auth),
user: dict = Depends(require_admin),
):
"""Sucht die neuesten Drucksachen im Landtag-Portal und enqueued
alle, die noch nicht in der DB bewertet sind.
@ -676,6 +676,7 @@ async def batch_analyze(
job_id,
run_drucksache_analysis,
job_id, doc.drucksache, text, bundesland, "qwen-plus", doc,
drucksache=doc.drucksache,
)
enqueued.append({
"drucksache": doc.drucksache,
@ -738,6 +739,7 @@ async def analyze_drucksache(
job_id,
run_drucksache_analysis,
job_id, drucksache, text, bundesland, model, doc,
drucksache=drucksache,
)
except QueueFullError:
await update_job(job_id, status="rejected", error="Queue voll")
@ -1066,7 +1068,7 @@ async def index_programme(
background_tasks: BackgroundTasks,
programm_id: str = Form(None),
all_programmes: bool = Form(False),
user: dict = Depends(require_auth),
user: dict = Depends(require_admin),
):
"""Index programme(s) for semantic search."""
pdf_dir = static_dir / "referenzen"

View File

@ -1,47 +1,39 @@
"""Analysis job queue with single-worker processing (#95).
"""Analysis job queue with configurable parallel workers (#95, #99).
Implements a FIFO queue backed by the existing ``jobs`` SQLite table
with an in-memory ``asyncio.Queue`` for the worker loop. On startup,
any ``status='queued'`` jobs from a previous run are re-enqueued.
Architecture:
- ``enqueue(job_id, callback, *args)`` puts a job on the queue
- A single worker coroutine processes jobs sequentially (concurrency=1)
with a configurable pause between calls (default 10s, DashScope-friendly)
- ``get_queue_status()`` returns position count + estimated wait
- ``MAX_QUEUE_SIZE`` limits backpressure HTTP 429 when full
The worker is started via ``start_worker()`` which should be called
from the FastAPI startup event.
Processes jobs via an asyncio.Queue with N concurrent workers (Semaphore).
Tracks per-job status for live UI visualization.
"""
import asyncio
import logging
import os
import time
from typing import Any, Callable, Coroutine, Optional
logger = logging.getLogger(__name__)
# Konfiguration
MAX_QUEUE_SIZE = 50 # Maximale Queue-Tiefe bevor 429 zurückgegeben wird
MIN_PAUSE_SECONDS = 10 # Mindest-Pause zwischen LLM-Calls (DashScope Rate-Limit)
BACKOFF_BASE = 15 # Exponentielles Backoff bei Fehlern (Sekunden)
BACKOFF_MAX = 300 # Maximales Backoff (5 Minuten)
MAX_QUEUE_SIZE = 50
CONCURRENCY = int(os.environ.get("QUEUE_CONCURRENCY", "3"))
MIN_PAUSE_SECONDS = 3 # Pause pro Worker zwischen Jobs
BACKOFF_BASE = 15
BACKOFF_MAX = 300
# In-Memory Queue
# In-Memory Queue + Job-Tracking
_queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
_worker_task: Optional[asyncio.Task] = None
_worker_tasks: list[asyncio.Task] = []
_stats = {
"processed": 0,
"failed": 0,
"started_at": None,
"last_completed_at": None,
"avg_duration": 60.0, # Initialer Schätzwert: 60s pro Job
"avg_duration": 60.0,
}
# Live Job-Tracking: job_id → {status, drucksache, started_at, duration, error}
_jobs: dict[str, dict] = {}
_MAX_TRACKED_JOBS = 100 # Älteste Jobs werden verworfen
class QueueFullError(Exception):
"""Raised when the queue is at capacity."""
pass
@ -49,112 +41,136 @@ async def enqueue(
job_id: str,
callback: Callable[..., Coroutine],
*args: Any,
drucksache: str = "",
**kwargs: Any,
) -> int:
"""Add a job to the queue. Returns the queue position (1-indexed).
Raises ``QueueFullError`` if the queue is at capacity.
"""
"""Add a job to the queue. Returns queue position."""
try:
_queue.put_nowait((job_id, callback, args, kwargs))
except asyncio.QueueFull:
raise QueueFullError(
f"Queue voll ({MAX_QUEUE_SIZE} Jobs). Bitte später erneut versuchen."
)
raise QueueFullError(f"Queue voll ({MAX_QUEUE_SIZE} Jobs).")
_jobs[job_id] = {
"status": "queued",
"drucksache": drucksache,
"enqueued_at": time.time(),
"started_at": None,
"duration": None,
"error": None,
}
# Alte Jobs trimmen
if len(_jobs) > _MAX_TRACKED_JOBS:
oldest = sorted(_jobs, key=lambda k: _jobs[k].get("enqueued_at", 0))
for k in oldest[:len(_jobs) - _MAX_TRACKED_JOBS]:
del _jobs[k]
position = _queue.qsize()
logger.info("Job %s enqueued at position %d", job_id, position)
logger.info("Job %s enqueued at position %d (concurrency=%d)", job_id, position, CONCURRENCY)
return position
def get_queue_status() -> dict:
"""Current queue status for the /api/queue/status endpoint."""
"""Queue status + per-job details for UI visualization."""
pending = _queue.qsize()
avg = _stats["avg_duration"]
estimated_wait = pending * (avg + MIN_PAUSE_SECONDS)
# Bei N Workern teilt sich die Wartezeit
estimated_wait = (pending / max(CONCURRENCY, 1)) * (avg + MIN_PAUSE_SECONDS)
# Jobs nach Status gruppieren
recent_jobs = sorted(_jobs.values(), key=lambda j: j.get("enqueued_at", 0), reverse=True)[:30]
return {
"pending": pending,
"max_size": MAX_QUEUE_SIZE,
"concurrency": CONCURRENCY,
"processed_total": _stats["processed"],
"failed_total": _stats["failed"],
"estimated_wait_seconds": round(estimated_wait),
"avg_job_duration_seconds": round(avg, 1),
"worker_running": _worker_task is not None and not _worker_task.done(),
"workers_running": sum(1 for t in _worker_tasks if not t.done()),
"jobs": [{
"job_id": jid,
"drucksache": j.get("drucksache", ""),
"status": j["status"],
"duration": round(j["duration"], 1) if j.get("duration") else None,
"error": j.get("error"),
} for jid, j in list(_jobs.items())[-30:]],
}
async def _worker():
"""Single worker coroutine — processes jobs sequentially."""
logger.info("Queue worker started (max_queue=%d, pause=%ds)", MAX_QUEUE_SIZE, MIN_PAUSE_SECONDS)
_stats["started_at"] = time.time()
async def _worker(worker_id: int):
"""Worker coroutine — picks jobs from queue, processes with Semaphore."""
logger.info("Worker %d started", worker_id)
consecutive_failures = 0
while True:
job_id, callback, args, kwargs = await _queue.get()
t0 = time.time()
if job_id in _jobs:
_jobs[job_id]["status"] = "processing"
_jobs[job_id]["started_at"] = t0
try:
logger.info("Processing job %s (queue remaining: %d)", job_id, _queue.qsize())
logger.info("Worker %d processing %s (queue: %d)", worker_id, job_id, _queue.qsize())
await callback(*args, **kwargs)
duration = time.time() - t0
_stats["processed"] += 1
_stats["last_completed_at"] = time.time()
# Gleitender Durchschnitt der Job-Dauer
_stats["avg_duration"] = (_stats["avg_duration"] * 0.8) + (duration * 0.2)
consecutive_failures = 0
logger.info("Job %s completed in %.1fs", job_id, duration)
except Exception:
if job_id in _jobs:
_jobs[job_id]["status"] = "completed"
_jobs[job_id]["duration"] = duration
logger.info("Worker %d completed %s in %.1fs", worker_id, job_id, duration)
except Exception as e:
_stats["failed"] += 1
consecutive_failures += 1
logger.exception("Job %s failed", job_id)
if job_id in _jobs:
_jobs[job_id]["status"] = "failed"
_jobs[job_id]["duration"] = time.time() - t0
_jobs[job_id]["error"] = str(e)[:100]
logger.exception("Worker %d failed %s", worker_id, job_id)
# Exponentielles Backoff bei Serien-Fehlern (z.B. LLM-Throttling)
if consecutive_failures > 1:
backoff = min(BACKOFF_BASE * (2 ** (consecutive_failures - 2)), BACKOFF_MAX)
logger.warning("Backoff %ds after %d consecutive failures", backoff, consecutive_failures)
logger.warning("Worker %d backoff %ds", worker_id, backoff)
await asyncio.sleep(backoff)
finally:
_queue.task_done()
# Mindest-Pause zwischen Jobs (DashScope-freundlich)
await asyncio.sleep(MIN_PAUSE_SECONDS)
def start_worker() -> asyncio.Task:
"""Start the worker coroutine. Call from FastAPI startup event."""
global _worker_task
if _worker_task is not None and not _worker_task.done():
logger.warning("Worker already running, not starting a second one")
return _worker_task
_worker_task = asyncio.create_task(_worker())
return _worker_task
def start_worker() -> list[asyncio.Task]:
"""Start N worker coroutines."""
global _worker_tasks
_stats["started_at"] = time.time()
for i in range(CONCURRENCY):
if i < len(_worker_tasks) and not _worker_tasks[i].done():
continue
task = asyncio.create_task(_worker(i))
if i < len(_worker_tasks):
_worker_tasks[i] = task
else:
_worker_tasks.append(task)
logger.info("Queue: %d workers started (QUEUE_CONCURRENCY=%d)", CONCURRENCY, CONCURRENCY)
return _worker_tasks
async def re_enqueue_pending():
"""Re-enqueue jobs with status='queued' from a previous run.
Called at startup to recover from container restarts. Only picks up
jobs that were queued but never started processing.
"""
"""Mark stale queued jobs from previous run."""
import aiosqlite
from .config import settings
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
rows = await db.execute(
"SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at"
)
rows = await db.execute("SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at")
queued = await rows.fetchall()
if not queued:
return
logger.info("Re-enqueueing %d pending jobs from previous run", len(queued))
# We can't re-enqueue with the original callback here because we don't
# know the original parameters. Mark them as 'stale' instead — the user
# can re-trigger via the UI.
import aiosqlite
async with aiosqlite.connect(settings.db_path) as db:
for row in queued:
await db.execute(
@ -162,4 +178,4 @@ async def re_enqueue_pending():
(row["id"],),
)
await db.commit()
logger.info("Marked %d stale jobs (user must re-trigger)", len(queued))
logger.info("Marked %d stale jobs", len(queued))

View File

@ -1387,18 +1387,43 @@
async function pollBatchQueue(statusEl) {
for (let i = 0; i < 200; i++) {
await new Promise(r => setTimeout(r, 5000));
await new Promise(r => setTimeout(r, 3000));
try {
const qs = await fetch('/api/queue/status').then(r => r.json());
if (qs.pending === 0) {
statusEl.innerHTML += `<br><span style="color:var(--color-green);">✓ Alle Jobs abgeschlossen (${qs.processed_total} verarbeitet)</span>`;
loadAssessments(); // Liste aktualisieren
// Job-Tabelle rendern
const jobs = qs.jobs || [];
const jobsHtml = jobs.length > 0 ? `
<table style="width:100%;font-size:0.8rem;border-collapse:collapse;margin-top:0.5rem;">
<tr style="color:#888;"><td>Drucksache</td><td>Status</td><td>Dauer</td></tr>
${jobs.map(j => {
const statusIcon = j.status === 'completed' ? '✅' : j.status === 'processing' ? '⏳' : j.status === 'failed' ? '❌' : '⏸';
const dur = j.duration ? j.duration + 's' : '';
const link = j.status === 'completed' ? `<a href="#" onclick="showMode('browse');setTimeout(()=>showDetail('${j.drucksache}'),100);return false;" style="color:var(--color-blue);">${j.drucksache}</a>` : j.drucksache;
return `<tr style="border-top:1px solid #f0f0f0;"><td>${link}</td><td>${statusIcon} ${j.status}</td><td>${dur}</td></tr>`;
}).join('')}
</table>` : '';
// Fortschrittsbalken
const completed = jobs.filter(j => j.status === 'completed').length;
const total = jobs.length;
const pct = total > 0 ? Math.round(completed / total * 100) : 0;
statusEl.innerHTML = `
<div style="margin:0.5rem 0;">
<div style="background:#eee;border-radius:4px;height:8px;overflow:hidden;">
<div style="background:var(--color-green);height:100%;width:${pct}%;transition:width 0.5s;"></div>
</div>
<span style="font-size:0.8rem;color:#888;">${completed}/${total} fertig · ${qs.concurrency} Worker · ~${Math.round(qs.estimated_wait_seconds/60)} Min.</span>
</div>
${jobsHtml}
`;
if (qs.pending === 0 && completed > 0) {
statusEl.innerHTML += `<br><span style="color:var(--color-green);">✓ Alle Jobs abgeschlossen</span>`;
loadAssessments();
return;
}
statusEl.querySelector('.queue-progress')?.remove();
statusEl.insertAdjacentHTML('beforeend',
`<span class="queue-progress"><br>⏳ ${qs.pending} Jobs in der Queue, ~${Math.round(qs.estimated_wait_seconds/60)} Min. verbleibend</span>`
);
} catch { break; }
}
}