#99 Queue: 3 parallele Worker + Job-Visualisierung + Admin-Schutz

Queue (queue.py):
- QUEUE_CONCURRENCY ENV (default 3) statt hartcodiert 1
- N Worker-Coroutines via asyncio tasks (nicht Semaphore — jeder
  Worker pickt eigenständig von der Queue)
- Per-Job-Tracking: job_id → {status, drucksache, duration, error}
- get_queue_status() liefert jobs-Array für UI-Tabelle

Visualisierung (index.html):
- Fortschrittsbalken (X/Y fertig, grün)
- Job-Tabelle: Drucksache + Status-Icon + Dauer
- Fertige Jobs klickbar → Detail-Ansicht
- Auto-Refresh alle 3s

Admin-Schutz (auth.py + main.py):
- Neue require_admin Dependency: prüft Keycloak-Rolle "admin" oder
  "gwoe-admin". Im Dev-Modus durchlassen.
- Batch-Analyse, Programme-Index, Assessment-Delete: require_admin
- Einzelanalyse, Bookmarks, Kommentare: bleiben require_auth
- Keycloak: Rolle "admin" erstellt + User tobias zugewiesen

Tests: 206 passed.

Refs: #99
This commit is contained in:
Dotty Dotter 2026-04-10 23:15:42 +02:00
parent 5f5d9edf83
commit d24949740b
4 changed files with 147 additions and 81 deletions

View File

@ -205,6 +205,29 @@ async def require_auth(request: Request) -> dict:
return user return user
async def require_admin(request: Request) -> dict:
"""Admin-Auth — gibt User-Dict oder HTTP 403.
Prüft ob der User die Rolle 'admin' oder 'gwoe-admin' hat.
Im Dev-Modus (Auth deaktiviert): durchlassen.
Für: Batch-Analyse, Programm-Indexierung, Assessment-Löschung.
"""
if not _is_auth_enabled():
return {"sub": "anonymous", "email": "", "name": "Dev-Modus", "roles": ["admin"]}
user = await require_auth(request)
roles = user.get("roles", [])
if "admin" in roles or "gwoe-admin" in roles:
return user
raise HTTPException(
status_code=403,
detail="Admin-Berechtigung erforderlich",
)
return user
# ───────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────────────────
# Auth-Info-Endpoint # Auth-Info-Endpoint
# ───────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────────────────

View File

@ -40,7 +40,7 @@ from .database import (
from .parlamente import get_adapter, ADAPTERS from .parlamente import get_adapter, ADAPTERS
from .bundeslaender import alle_bundeslaender from .bundeslaender import alle_bundeslaender
from .analyzer import analyze_antrag from .analyzer import analyze_antrag
from .auth import get_current_user, require_auth, keycloak_login_url, _is_auth_enabled from .auth import get_current_user, require_auth, require_admin, keycloak_login_url, _is_auth_enabled
def _pick_best_title(llm_title: str, doc_title: Optional[str], drucksache: str) -> str: def _pick_best_title(llm_title: str, doc_title: Optional[str], drucksache: str) -> str:
@ -487,7 +487,7 @@ async def get_single_assessment(drucksache: str):
@app.delete("/api/assessment/delete") @app.delete("/api/assessment/delete")
async def delete_assessment_endpoint( async def delete_assessment_endpoint(
drucksache: str, drucksache: str,
user: dict = Depends(require_auth), user: dict = Depends(require_admin),
): ):
"""Löscht ein Assessment, damit es neu analysiert werden kann.""" """Löscht ein Assessment, damit es neu analysiert werden kann."""
drucksache = validate_drucksache(drucksache) drucksache = validate_drucksache(drucksache)
@ -635,7 +635,7 @@ async def batch_analyze(
request: Request, request: Request,
bundesland: str = Form(...), bundesland: str = Form(...),
limit: int = Form(10), limit: int = Form(10),
user: dict = Depends(require_auth), user: dict = Depends(require_admin),
): ):
"""Sucht die neuesten Drucksachen im Landtag-Portal und enqueued """Sucht die neuesten Drucksachen im Landtag-Portal und enqueued
alle, die noch nicht in der DB bewertet sind. alle, die noch nicht in der DB bewertet sind.
@ -676,6 +676,7 @@ async def batch_analyze(
job_id, job_id,
run_drucksache_analysis, run_drucksache_analysis,
job_id, doc.drucksache, text, bundesland, "qwen-plus", doc, job_id, doc.drucksache, text, bundesland, "qwen-plus", doc,
drucksache=doc.drucksache,
) )
enqueued.append({ enqueued.append({
"drucksache": doc.drucksache, "drucksache": doc.drucksache,
@ -738,6 +739,7 @@ async def analyze_drucksache(
job_id, job_id,
run_drucksache_analysis, run_drucksache_analysis,
job_id, drucksache, text, bundesland, model, doc, job_id, drucksache, text, bundesland, model, doc,
drucksache=drucksache,
) )
except QueueFullError: except QueueFullError:
await update_job(job_id, status="rejected", error="Queue voll") await update_job(job_id, status="rejected", error="Queue voll")
@ -1066,7 +1068,7 @@ async def index_programme(
background_tasks: BackgroundTasks, background_tasks: BackgroundTasks,
programm_id: str = Form(None), programm_id: str = Form(None),
all_programmes: bool = Form(False), all_programmes: bool = Form(False),
user: dict = Depends(require_auth), user: dict = Depends(require_admin),
): ):
"""Index programme(s) for semantic search.""" """Index programme(s) for semantic search."""
pdf_dir = static_dir / "referenzen" pdf_dir = static_dir / "referenzen"

View File

@ -1,47 +1,39 @@
"""Analysis job queue with single-worker processing (#95). """Analysis job queue with configurable parallel workers (#95, #99).
Implements a FIFO queue backed by the existing ``jobs`` SQLite table Processes jobs via an asyncio.Queue with N concurrent workers (Semaphore).
with an in-memory ``asyncio.Queue`` for the worker loop. On startup, Tracks per-job status for live UI visualization.
any ``status='queued'`` jobs from a previous run are re-enqueued.
Architecture:
- ``enqueue(job_id, callback, *args)`` puts a job on the queue
- A single worker coroutine processes jobs sequentially (concurrency=1)
with a configurable pause between calls (default 10s, DashScope-friendly)
- ``get_queue_status()`` returns position count + estimated wait
- ``MAX_QUEUE_SIZE`` limits backpressure HTTP 429 when full
The worker is started via ``start_worker()`` which should be called
from the FastAPI startup event.
""" """
import asyncio import asyncio
import logging import logging
import os
import time import time
from typing import Any, Callable, Coroutine, Optional from typing import Any, Callable, Coroutine, Optional
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Konfiguration # Konfiguration
MAX_QUEUE_SIZE = 50 # Maximale Queue-Tiefe bevor 429 zurückgegeben wird MAX_QUEUE_SIZE = 50
MIN_PAUSE_SECONDS = 10 # Mindest-Pause zwischen LLM-Calls (DashScope Rate-Limit) CONCURRENCY = int(os.environ.get("QUEUE_CONCURRENCY", "3"))
BACKOFF_BASE = 15 # Exponentielles Backoff bei Fehlern (Sekunden) MIN_PAUSE_SECONDS = 3 # Pause pro Worker zwischen Jobs
BACKOFF_MAX = 300 # Maximales Backoff (5 Minuten) BACKOFF_BASE = 15
BACKOFF_MAX = 300
# In-Memory Queue # In-Memory Queue + Job-Tracking
_queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) _queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
_worker_task: Optional[asyncio.Task] = None _worker_tasks: list[asyncio.Task] = []
_stats = { _stats = {
"processed": 0, "processed": 0,
"failed": 0, "failed": 0,
"started_at": None, "started_at": None,
"last_completed_at": None, "avg_duration": 60.0,
"avg_duration": 60.0, # Initialer Schätzwert: 60s pro Job
} }
# Live Job-Tracking: job_id → {status, drucksache, started_at, duration, error}
_jobs: dict[str, dict] = {}
_MAX_TRACKED_JOBS = 100 # Älteste Jobs werden verworfen
class QueueFullError(Exception): class QueueFullError(Exception):
"""Raised when the queue is at capacity."""
pass pass
@ -49,112 +41,136 @@ async def enqueue(
job_id: str, job_id: str,
callback: Callable[..., Coroutine], callback: Callable[..., Coroutine],
*args: Any, *args: Any,
drucksache: str = "",
**kwargs: Any, **kwargs: Any,
) -> int: ) -> int:
"""Add a job to the queue. Returns the queue position (1-indexed). """Add a job to the queue. Returns queue position."""
Raises ``QueueFullError`` if the queue is at capacity.
"""
try: try:
_queue.put_nowait((job_id, callback, args, kwargs)) _queue.put_nowait((job_id, callback, args, kwargs))
except asyncio.QueueFull: except asyncio.QueueFull:
raise QueueFullError( raise QueueFullError(f"Queue voll ({MAX_QUEUE_SIZE} Jobs).")
f"Queue voll ({MAX_QUEUE_SIZE} Jobs). Bitte später erneut versuchen." _jobs[job_id] = {
) "status": "queued",
"drucksache": drucksache,
"enqueued_at": time.time(),
"started_at": None,
"duration": None,
"error": None,
}
# Alte Jobs trimmen
if len(_jobs) > _MAX_TRACKED_JOBS:
oldest = sorted(_jobs, key=lambda k: _jobs[k].get("enqueued_at", 0))
for k in oldest[:len(_jobs) - _MAX_TRACKED_JOBS]:
del _jobs[k]
position = _queue.qsize() position = _queue.qsize()
logger.info("Job %s enqueued at position %d", job_id, position) logger.info("Job %s enqueued at position %d (concurrency=%d)", job_id, position, CONCURRENCY)
return position return position
def get_queue_status() -> dict: def get_queue_status() -> dict:
"""Current queue status for the /api/queue/status endpoint.""" """Queue status + per-job details for UI visualization."""
pending = _queue.qsize() pending = _queue.qsize()
avg = _stats["avg_duration"] avg = _stats["avg_duration"]
estimated_wait = pending * (avg + MIN_PAUSE_SECONDS) # Bei N Workern teilt sich die Wartezeit
estimated_wait = (pending / max(CONCURRENCY, 1)) * (avg + MIN_PAUSE_SECONDS)
# Jobs nach Status gruppieren
recent_jobs = sorted(_jobs.values(), key=lambda j: j.get("enqueued_at", 0), reverse=True)[:30]
return { return {
"pending": pending, "pending": pending,
"max_size": MAX_QUEUE_SIZE, "max_size": MAX_QUEUE_SIZE,
"concurrency": CONCURRENCY,
"processed_total": _stats["processed"], "processed_total": _stats["processed"],
"failed_total": _stats["failed"], "failed_total": _stats["failed"],
"estimated_wait_seconds": round(estimated_wait), "estimated_wait_seconds": round(estimated_wait),
"avg_job_duration_seconds": round(avg, 1), "avg_job_duration_seconds": round(avg, 1),
"worker_running": _worker_task is not None and not _worker_task.done(), "workers_running": sum(1 for t in _worker_tasks if not t.done()),
"jobs": [{
"job_id": jid,
"drucksache": j.get("drucksache", ""),
"status": j["status"],
"duration": round(j["duration"], 1) if j.get("duration") else None,
"error": j.get("error"),
} for jid, j in list(_jobs.items())[-30:]],
} }
async def _worker(): async def _worker(worker_id: int):
"""Single worker coroutine — processes jobs sequentially.""" """Worker coroutine — picks jobs from queue, processes with Semaphore."""
logger.info("Queue worker started (max_queue=%d, pause=%ds)", MAX_QUEUE_SIZE, MIN_PAUSE_SECONDS) logger.info("Worker %d started", worker_id)
_stats["started_at"] = time.time()
consecutive_failures = 0 consecutive_failures = 0
while True: while True:
job_id, callback, args, kwargs = await _queue.get() job_id, callback, args, kwargs = await _queue.get()
t0 = time.time() t0 = time.time()
if job_id in _jobs:
_jobs[job_id]["status"] = "processing"
_jobs[job_id]["started_at"] = t0
try: try:
logger.info("Processing job %s (queue remaining: %d)", job_id, _queue.qsize()) logger.info("Worker %d processing %s (queue: %d)", worker_id, job_id, _queue.qsize())
await callback(*args, **kwargs) await callback(*args, **kwargs)
duration = time.time() - t0 duration = time.time() - t0
_stats["processed"] += 1 _stats["processed"] += 1
_stats["last_completed_at"] = time.time()
# Gleitender Durchschnitt der Job-Dauer
_stats["avg_duration"] = (_stats["avg_duration"] * 0.8) + (duration * 0.2) _stats["avg_duration"] = (_stats["avg_duration"] * 0.8) + (duration * 0.2)
consecutive_failures = 0 consecutive_failures = 0
logger.info("Job %s completed in %.1fs", job_id, duration)
except Exception: if job_id in _jobs:
_jobs[job_id]["status"] = "completed"
_jobs[job_id]["duration"] = duration
logger.info("Worker %d completed %s in %.1fs", worker_id, job_id, duration)
except Exception as e:
_stats["failed"] += 1 _stats["failed"] += 1
consecutive_failures += 1 consecutive_failures += 1
logger.exception("Job %s failed", job_id) if job_id in _jobs:
_jobs[job_id]["status"] = "failed"
_jobs[job_id]["duration"] = time.time() - t0
_jobs[job_id]["error"] = str(e)[:100]
logger.exception("Worker %d failed %s", worker_id, job_id)
# Exponentielles Backoff bei Serien-Fehlern (z.B. LLM-Throttling)
if consecutive_failures > 1: if consecutive_failures > 1:
backoff = min(BACKOFF_BASE * (2 ** (consecutive_failures - 2)), BACKOFF_MAX) backoff = min(BACKOFF_BASE * (2 ** (consecutive_failures - 2)), BACKOFF_MAX)
logger.warning("Backoff %ds after %d consecutive failures", backoff, consecutive_failures) logger.warning("Worker %d backoff %ds", worker_id, backoff)
await asyncio.sleep(backoff) await asyncio.sleep(backoff)
finally: finally:
_queue.task_done() _queue.task_done()
# Mindest-Pause zwischen Jobs (DashScope-freundlich)
await asyncio.sleep(MIN_PAUSE_SECONDS) await asyncio.sleep(MIN_PAUSE_SECONDS)
def start_worker() -> asyncio.Task: def start_worker() -> list[asyncio.Task]:
"""Start the worker coroutine. Call from FastAPI startup event.""" """Start N worker coroutines."""
global _worker_task global _worker_tasks
if _worker_task is not None and not _worker_task.done(): _stats["started_at"] = time.time()
logger.warning("Worker already running, not starting a second one") for i in range(CONCURRENCY):
return _worker_task if i < len(_worker_tasks) and not _worker_tasks[i].done():
_worker_task = asyncio.create_task(_worker()) continue
return _worker_task task = asyncio.create_task(_worker(i))
if i < len(_worker_tasks):
_worker_tasks[i] = task
else:
_worker_tasks.append(task)
logger.info("Queue: %d workers started (QUEUE_CONCURRENCY=%d)", CONCURRENCY, CONCURRENCY)
return _worker_tasks
async def re_enqueue_pending(): async def re_enqueue_pending():
"""Re-enqueue jobs with status='queued' from a previous run. """Mark stale queued jobs from previous run."""
Called at startup to recover from container restarts. Only picks up
jobs that were queued but never started processing.
"""
import aiosqlite import aiosqlite
from .config import settings from .config import settings
async with aiosqlite.connect(settings.db_path) as db: async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row db.row_factory = aiosqlite.Row
rows = await db.execute( rows = await db.execute("SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at")
"SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at"
)
queued = await rows.fetchall() queued = await rows.fetchall()
if not queued: if not queued:
return return
logger.info("Re-enqueueing %d pending jobs from previous run", len(queued))
# We can't re-enqueue with the original callback here because we don't
# know the original parameters. Mark them as 'stale' instead — the user
# can re-trigger via the UI.
import aiosqlite
async with aiosqlite.connect(settings.db_path) as db: async with aiosqlite.connect(settings.db_path) as db:
for row in queued: for row in queued:
await db.execute( await db.execute(
@ -162,4 +178,4 @@ async def re_enqueue_pending():
(row["id"],), (row["id"],),
) )
await db.commit() await db.commit()
logger.info("Marked %d stale jobs (user must re-trigger)", len(queued)) logger.info("Marked %d stale jobs", len(queued))

View File

@ -1387,18 +1387,43 @@
async function pollBatchQueue(statusEl) { async function pollBatchQueue(statusEl) {
for (let i = 0; i < 200; i++) { for (let i = 0; i < 200; i++) {
await new Promise(r => setTimeout(r, 5000)); await new Promise(r => setTimeout(r, 3000));
try { try {
const qs = await fetch('/api/queue/status').then(r => r.json()); const qs = await fetch('/api/queue/status').then(r => r.json());
if (qs.pending === 0) {
statusEl.innerHTML += `<br><span style="color:var(--color-green);">✓ Alle Jobs abgeschlossen (${qs.processed_total} verarbeitet)</span>`; // Job-Tabelle rendern
loadAssessments(); // Liste aktualisieren const jobs = qs.jobs || [];
const jobsHtml = jobs.length > 0 ? `
<table style="width:100%;font-size:0.8rem;border-collapse:collapse;margin-top:0.5rem;">
<tr style="color:#888;"><td>Drucksache</td><td>Status</td><td>Dauer</td></tr>
${jobs.map(j => {
const statusIcon = j.status === 'completed' ? '✅' : j.status === 'processing' ? '⏳' : j.status === 'failed' ? '❌' : '⏸';
const dur = j.duration ? j.duration + 's' : '';
const link = j.status === 'completed' ? `<a href="#" onclick="showMode('browse');setTimeout(()=>showDetail('${j.drucksache}'),100);return false;" style="color:var(--color-blue);">${j.drucksache}</a>` : j.drucksache;
return `<tr style="border-top:1px solid #f0f0f0;"><td>${link}</td><td>${statusIcon} ${j.status}</td><td>${dur}</td></tr>`;
}).join('')}
</table>` : '';
// Fortschrittsbalken
const completed = jobs.filter(j => j.status === 'completed').length;
const total = jobs.length;
const pct = total > 0 ? Math.round(completed / total * 100) : 0;
statusEl.innerHTML = `
<div style="margin:0.5rem 0;">
<div style="background:#eee;border-radius:4px;height:8px;overflow:hidden;">
<div style="background:var(--color-green);height:100%;width:${pct}%;transition:width 0.5s;"></div>
</div>
<span style="font-size:0.8rem;color:#888;">${completed}/${total} fertig · ${qs.concurrency} Worker · ~${Math.round(qs.estimated_wait_seconds/60)} Min.</span>
</div>
${jobsHtml}
`;
if (qs.pending === 0 && completed > 0) {
statusEl.innerHTML += `<br><span style="color:var(--color-green);">✓ Alle Jobs abgeschlossen</span>`;
loadAssessments();
return; return;
} }
statusEl.querySelector('.queue-progress')?.remove();
statusEl.insertAdjacentHTML('beforeend',
`<span class="queue-progress"><br>⏳ ${qs.pending} Jobs in der Queue, ~${Math.round(qs.estimated_wait_seconds/60)} Min. verbleibend</span>`
);
} catch { break; } } catch { break; }
} }
} }