#99 Queue: 3 parallele Worker + Job-Visualisierung + Admin-Schutz
Queue (queue.py):
- QUEUE_CONCURRENCY ENV (default 3) statt hartcodiert 1
- N Worker-Coroutines via asyncio tasks (nicht Semaphore — jeder
Worker pickt eigenständig von der Queue)
- Per-Job-Tracking: job_id → {status, drucksache, duration, error}
- get_queue_status() liefert jobs-Array für UI-Tabelle
Visualisierung (index.html):
- Fortschrittsbalken (X/Y fertig, grün)
- Job-Tabelle: Drucksache + Status-Icon + Dauer
- Fertige Jobs klickbar → Detail-Ansicht
- Auto-Refresh alle 3s
Admin-Schutz (auth.py + main.py):
- Neue require_admin Dependency: prüft Keycloak-Rolle "admin" oder
"gwoe-admin". Im Dev-Modus durchlassen.
- Batch-Analyse, Programme-Index, Assessment-Delete: require_admin
- Einzelanalyse, Bookmarks, Kommentare: bleiben require_auth
- Keycloak: Rolle "admin" erstellt + User tobias zugewiesen
Tests: 206 passed.
Refs: #99
This commit is contained in:
parent
5f5d9edf83
commit
d24949740b
23
app/auth.py
23
app/auth.py
@ -205,6 +205,29 @@ async def require_auth(request: Request) -> dict:
|
||||
return user
|
||||
|
||||
|
||||
async def require_admin(request: Request) -> dict:
|
||||
"""Admin-Auth — gibt User-Dict oder HTTP 403.
|
||||
|
||||
Prüft ob der User die Rolle 'admin' oder 'gwoe-admin' hat.
|
||||
Im Dev-Modus (Auth deaktiviert): durchlassen.
|
||||
Für: Batch-Analyse, Programm-Indexierung, Assessment-Löschung.
|
||||
"""
|
||||
if not _is_auth_enabled():
|
||||
return {"sub": "anonymous", "email": "", "name": "Dev-Modus", "roles": ["admin"]}
|
||||
|
||||
user = await require_auth(request)
|
||||
roles = user.get("roles", [])
|
||||
if "admin" in roles or "gwoe-admin" in roles:
|
||||
return user
|
||||
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail="Admin-Berechtigung erforderlich",
|
||||
)
|
||||
|
||||
return user
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Auth-Info-Endpoint
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
10
app/main.py
10
app/main.py
@ -40,7 +40,7 @@ from .database import (
|
||||
from .parlamente import get_adapter, ADAPTERS
|
||||
from .bundeslaender import alle_bundeslaender
|
||||
from .analyzer import analyze_antrag
|
||||
from .auth import get_current_user, require_auth, keycloak_login_url, _is_auth_enabled
|
||||
from .auth import get_current_user, require_auth, require_admin, keycloak_login_url, _is_auth_enabled
|
||||
|
||||
|
||||
def _pick_best_title(llm_title: str, doc_title: Optional[str], drucksache: str) -> str:
|
||||
@ -487,7 +487,7 @@ async def get_single_assessment(drucksache: str):
|
||||
@app.delete("/api/assessment/delete")
|
||||
async def delete_assessment_endpoint(
|
||||
drucksache: str,
|
||||
user: dict = Depends(require_auth),
|
||||
user: dict = Depends(require_admin),
|
||||
):
|
||||
"""Löscht ein Assessment, damit es neu analysiert werden kann."""
|
||||
drucksache = validate_drucksache(drucksache)
|
||||
@ -635,7 +635,7 @@ async def batch_analyze(
|
||||
request: Request,
|
||||
bundesland: str = Form(...),
|
||||
limit: int = Form(10),
|
||||
user: dict = Depends(require_auth),
|
||||
user: dict = Depends(require_admin),
|
||||
):
|
||||
"""Sucht die neuesten Drucksachen im Landtag-Portal und enqueued
|
||||
alle, die noch nicht in der DB bewertet sind.
|
||||
@ -676,6 +676,7 @@ async def batch_analyze(
|
||||
job_id,
|
||||
run_drucksache_analysis,
|
||||
job_id, doc.drucksache, text, bundesland, "qwen-plus", doc,
|
||||
drucksache=doc.drucksache,
|
||||
)
|
||||
enqueued.append({
|
||||
"drucksache": doc.drucksache,
|
||||
@ -738,6 +739,7 @@ async def analyze_drucksache(
|
||||
job_id,
|
||||
run_drucksache_analysis,
|
||||
job_id, drucksache, text, bundesland, model, doc,
|
||||
drucksache=drucksache,
|
||||
)
|
||||
except QueueFullError:
|
||||
await update_job(job_id, status="rejected", error="Queue voll")
|
||||
@ -1066,7 +1068,7 @@ async def index_programme(
|
||||
background_tasks: BackgroundTasks,
|
||||
programm_id: str = Form(None),
|
||||
all_programmes: bool = Form(False),
|
||||
user: dict = Depends(require_auth),
|
||||
user: dict = Depends(require_admin),
|
||||
):
|
||||
"""Index programme(s) for semantic search."""
|
||||
pdf_dir = static_dir / "referenzen"
|
||||
|
||||
154
app/queue.py
154
app/queue.py
@ -1,47 +1,39 @@
|
||||
"""Analysis job queue with single-worker processing (#95).
|
||||
"""Analysis job queue with configurable parallel workers (#95, #99).
|
||||
|
||||
Implements a FIFO queue backed by the existing ``jobs`` SQLite table
|
||||
with an in-memory ``asyncio.Queue`` for the worker loop. On startup,
|
||||
any ``status='queued'`` jobs from a previous run are re-enqueued.
|
||||
|
||||
Architecture:
|
||||
- ``enqueue(job_id, callback, *args)`` → puts a job on the queue
|
||||
- A single worker coroutine processes jobs sequentially (concurrency=1)
|
||||
with a configurable pause between calls (default 10s, DashScope-friendly)
|
||||
- ``get_queue_status()`` returns position count + estimated wait
|
||||
- ``MAX_QUEUE_SIZE`` limits backpressure → HTTP 429 when full
|
||||
|
||||
The worker is started via ``start_worker()`` which should be called
|
||||
from the FastAPI startup event.
|
||||
Processes jobs via an asyncio.Queue with N concurrent workers (Semaphore).
|
||||
Tracks per-job status for live UI visualization.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from typing import Any, Callable, Coroutine, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Konfiguration
|
||||
MAX_QUEUE_SIZE = 50 # Maximale Queue-Tiefe bevor 429 zurückgegeben wird
|
||||
MIN_PAUSE_SECONDS = 10 # Mindest-Pause zwischen LLM-Calls (DashScope Rate-Limit)
|
||||
BACKOFF_BASE = 15 # Exponentielles Backoff bei Fehlern (Sekunden)
|
||||
BACKOFF_MAX = 300 # Maximales Backoff (5 Minuten)
|
||||
MAX_QUEUE_SIZE = 50
|
||||
CONCURRENCY = int(os.environ.get("QUEUE_CONCURRENCY", "3"))
|
||||
MIN_PAUSE_SECONDS = 3 # Pause pro Worker zwischen Jobs
|
||||
BACKOFF_BASE = 15
|
||||
BACKOFF_MAX = 300
|
||||
|
||||
# In-Memory Queue
|
||||
# In-Memory Queue + Job-Tracking
|
||||
_queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
|
||||
_worker_task: Optional[asyncio.Task] = None
|
||||
_worker_tasks: list[asyncio.Task] = []
|
||||
_stats = {
|
||||
"processed": 0,
|
||||
"failed": 0,
|
||||
"started_at": None,
|
||||
"last_completed_at": None,
|
||||
"avg_duration": 60.0, # Initialer Schätzwert: 60s pro Job
|
||||
"avg_duration": 60.0,
|
||||
}
|
||||
# Live Job-Tracking: job_id → {status, drucksache, started_at, duration, error}
|
||||
_jobs: dict[str, dict] = {}
|
||||
_MAX_TRACKED_JOBS = 100 # Älteste Jobs werden verworfen
|
||||
|
||||
|
||||
class QueueFullError(Exception):
|
||||
"""Raised when the queue is at capacity."""
|
||||
pass
|
||||
|
||||
|
||||
@ -49,112 +41,136 @@ async def enqueue(
|
||||
job_id: str,
|
||||
callback: Callable[..., Coroutine],
|
||||
*args: Any,
|
||||
drucksache: str = "",
|
||||
**kwargs: Any,
|
||||
) -> int:
|
||||
"""Add a job to the queue. Returns the queue position (1-indexed).
|
||||
|
||||
Raises ``QueueFullError`` if the queue is at capacity.
|
||||
"""
|
||||
"""Add a job to the queue. Returns queue position."""
|
||||
try:
|
||||
_queue.put_nowait((job_id, callback, args, kwargs))
|
||||
except asyncio.QueueFull:
|
||||
raise QueueFullError(
|
||||
f"Queue voll ({MAX_QUEUE_SIZE} Jobs). Bitte später erneut versuchen."
|
||||
)
|
||||
raise QueueFullError(f"Queue voll ({MAX_QUEUE_SIZE} Jobs).")
|
||||
_jobs[job_id] = {
|
||||
"status": "queued",
|
||||
"drucksache": drucksache,
|
||||
"enqueued_at": time.time(),
|
||||
"started_at": None,
|
||||
"duration": None,
|
||||
"error": None,
|
||||
}
|
||||
# Alte Jobs trimmen
|
||||
if len(_jobs) > _MAX_TRACKED_JOBS:
|
||||
oldest = sorted(_jobs, key=lambda k: _jobs[k].get("enqueued_at", 0))
|
||||
for k in oldest[:len(_jobs) - _MAX_TRACKED_JOBS]:
|
||||
del _jobs[k]
|
||||
position = _queue.qsize()
|
||||
logger.info("Job %s enqueued at position %d", job_id, position)
|
||||
logger.info("Job %s enqueued at position %d (concurrency=%d)", job_id, position, CONCURRENCY)
|
||||
return position
|
||||
|
||||
|
||||
def get_queue_status() -> dict:
|
||||
"""Current queue status for the /api/queue/status endpoint."""
|
||||
"""Queue status + per-job details for UI visualization."""
|
||||
pending = _queue.qsize()
|
||||
avg = _stats["avg_duration"]
|
||||
estimated_wait = pending * (avg + MIN_PAUSE_SECONDS)
|
||||
# Bei N Workern teilt sich die Wartezeit
|
||||
estimated_wait = (pending / max(CONCURRENCY, 1)) * (avg + MIN_PAUSE_SECONDS)
|
||||
|
||||
# Jobs nach Status gruppieren
|
||||
recent_jobs = sorted(_jobs.values(), key=lambda j: j.get("enqueued_at", 0), reverse=True)[:30]
|
||||
|
||||
return {
|
||||
"pending": pending,
|
||||
"max_size": MAX_QUEUE_SIZE,
|
||||
"concurrency": CONCURRENCY,
|
||||
"processed_total": _stats["processed"],
|
||||
"failed_total": _stats["failed"],
|
||||
"estimated_wait_seconds": round(estimated_wait),
|
||||
"avg_job_duration_seconds": round(avg, 1),
|
||||
"worker_running": _worker_task is not None and not _worker_task.done(),
|
||||
"workers_running": sum(1 for t in _worker_tasks if not t.done()),
|
||||
"jobs": [{
|
||||
"job_id": jid,
|
||||
"drucksache": j.get("drucksache", ""),
|
||||
"status": j["status"],
|
||||
"duration": round(j["duration"], 1) if j.get("duration") else None,
|
||||
"error": j.get("error"),
|
||||
} for jid, j in list(_jobs.items())[-30:]],
|
||||
}
|
||||
|
||||
|
||||
async def _worker():
|
||||
"""Single worker coroutine — processes jobs sequentially."""
|
||||
logger.info("Queue worker started (max_queue=%d, pause=%ds)", MAX_QUEUE_SIZE, MIN_PAUSE_SECONDS)
|
||||
_stats["started_at"] = time.time()
|
||||
async def _worker(worker_id: int):
|
||||
"""Worker coroutine — picks jobs from queue, processes with Semaphore."""
|
||||
logger.info("Worker %d started", worker_id)
|
||||
consecutive_failures = 0
|
||||
|
||||
while True:
|
||||
job_id, callback, args, kwargs = await _queue.get()
|
||||
t0 = time.time()
|
||||
|
||||
if job_id in _jobs:
|
||||
_jobs[job_id]["status"] = "processing"
|
||||
_jobs[job_id]["started_at"] = t0
|
||||
|
||||
try:
|
||||
logger.info("Processing job %s (queue remaining: %d)", job_id, _queue.qsize())
|
||||
logger.info("Worker %d processing %s (queue: %d)", worker_id, job_id, _queue.qsize())
|
||||
await callback(*args, **kwargs)
|
||||
duration = time.time() - t0
|
||||
_stats["processed"] += 1
|
||||
_stats["last_completed_at"] = time.time()
|
||||
# Gleitender Durchschnitt der Job-Dauer
|
||||
_stats["avg_duration"] = (_stats["avg_duration"] * 0.8) + (duration * 0.2)
|
||||
consecutive_failures = 0
|
||||
logger.info("Job %s completed in %.1fs", job_id, duration)
|
||||
|
||||
except Exception:
|
||||
if job_id in _jobs:
|
||||
_jobs[job_id]["status"] = "completed"
|
||||
_jobs[job_id]["duration"] = duration
|
||||
logger.info("Worker %d completed %s in %.1fs", worker_id, job_id, duration)
|
||||
|
||||
except Exception as e:
|
||||
_stats["failed"] += 1
|
||||
consecutive_failures += 1
|
||||
logger.exception("Job %s failed", job_id)
|
||||
if job_id in _jobs:
|
||||
_jobs[job_id]["status"] = "failed"
|
||||
_jobs[job_id]["duration"] = time.time() - t0
|
||||
_jobs[job_id]["error"] = str(e)[:100]
|
||||
logger.exception("Worker %d failed %s", worker_id, job_id)
|
||||
|
||||
# Exponentielles Backoff bei Serien-Fehlern (z.B. LLM-Throttling)
|
||||
if consecutive_failures > 1:
|
||||
backoff = min(BACKOFF_BASE * (2 ** (consecutive_failures - 2)), BACKOFF_MAX)
|
||||
logger.warning("Backoff %ds after %d consecutive failures", backoff, consecutive_failures)
|
||||
logger.warning("Worker %d backoff %ds", worker_id, backoff)
|
||||
await asyncio.sleep(backoff)
|
||||
|
||||
finally:
|
||||
_queue.task_done()
|
||||
|
||||
# Mindest-Pause zwischen Jobs (DashScope-freundlich)
|
||||
await asyncio.sleep(MIN_PAUSE_SECONDS)
|
||||
|
||||
|
||||
def start_worker() -> asyncio.Task:
|
||||
"""Start the worker coroutine. Call from FastAPI startup event."""
|
||||
global _worker_task
|
||||
if _worker_task is not None and not _worker_task.done():
|
||||
logger.warning("Worker already running, not starting a second one")
|
||||
return _worker_task
|
||||
_worker_task = asyncio.create_task(_worker())
|
||||
return _worker_task
|
||||
def start_worker() -> list[asyncio.Task]:
|
||||
"""Start N worker coroutines."""
|
||||
global _worker_tasks
|
||||
_stats["started_at"] = time.time()
|
||||
for i in range(CONCURRENCY):
|
||||
if i < len(_worker_tasks) and not _worker_tasks[i].done():
|
||||
continue
|
||||
task = asyncio.create_task(_worker(i))
|
||||
if i < len(_worker_tasks):
|
||||
_worker_tasks[i] = task
|
||||
else:
|
||||
_worker_tasks.append(task)
|
||||
logger.info("Queue: %d workers started (QUEUE_CONCURRENCY=%d)", CONCURRENCY, CONCURRENCY)
|
||||
return _worker_tasks
|
||||
|
||||
|
||||
async def re_enqueue_pending():
|
||||
"""Re-enqueue jobs with status='queued' from a previous run.
|
||||
|
||||
Called at startup to recover from container restarts. Only picks up
|
||||
jobs that were queued but never started processing.
|
||||
"""
|
||||
"""Mark stale queued jobs from previous run."""
|
||||
import aiosqlite
|
||||
from .config import settings
|
||||
|
||||
async with aiosqlite.connect(settings.db_path) as db:
|
||||
db.row_factory = aiosqlite.Row
|
||||
rows = await db.execute(
|
||||
"SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at"
|
||||
)
|
||||
rows = await db.execute("SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at")
|
||||
queued = await rows.fetchall()
|
||||
|
||||
if not queued:
|
||||
return
|
||||
|
||||
logger.info("Re-enqueueing %d pending jobs from previous run", len(queued))
|
||||
# We can't re-enqueue with the original callback here because we don't
|
||||
# know the original parameters. Mark them as 'stale' instead — the user
|
||||
# can re-trigger via the UI.
|
||||
import aiosqlite
|
||||
async with aiosqlite.connect(settings.db_path) as db:
|
||||
for row in queued:
|
||||
await db.execute(
|
||||
@ -162,4 +178,4 @@ async def re_enqueue_pending():
|
||||
(row["id"],),
|
||||
)
|
||||
await db.commit()
|
||||
logger.info("Marked %d stale jobs (user must re-trigger)", len(queued))
|
||||
logger.info("Marked %d stale jobs", len(queued))
|
||||
|
||||
@ -1387,18 +1387,43 @@
|
||||
|
||||
async function pollBatchQueue(statusEl) {
|
||||
for (let i = 0; i < 200; i++) {
|
||||
await new Promise(r => setTimeout(r, 5000));
|
||||
await new Promise(r => setTimeout(r, 3000));
|
||||
try {
|
||||
const qs = await fetch('/api/queue/status').then(r => r.json());
|
||||
if (qs.pending === 0) {
|
||||
statusEl.innerHTML += `<br><span style="color:var(--color-green);">✓ Alle Jobs abgeschlossen (${qs.processed_total} verarbeitet)</span>`;
|
||||
loadAssessments(); // Liste aktualisieren
|
||||
|
||||
// Job-Tabelle rendern
|
||||
const jobs = qs.jobs || [];
|
||||
const jobsHtml = jobs.length > 0 ? `
|
||||
<table style="width:100%;font-size:0.8rem;border-collapse:collapse;margin-top:0.5rem;">
|
||||
<tr style="color:#888;"><td>Drucksache</td><td>Status</td><td>Dauer</td></tr>
|
||||
${jobs.map(j => {
|
||||
const statusIcon = j.status === 'completed' ? '✅' : j.status === 'processing' ? '⏳' : j.status === 'failed' ? '❌' : '⏸';
|
||||
const dur = j.duration ? j.duration + 's' : '';
|
||||
const link = j.status === 'completed' ? `<a href="#" onclick="showMode('browse');setTimeout(()=>showDetail('${j.drucksache}'),100);return false;" style="color:var(--color-blue);">${j.drucksache}</a>` : j.drucksache;
|
||||
return `<tr style="border-top:1px solid #f0f0f0;"><td>${link}</td><td>${statusIcon} ${j.status}</td><td>${dur}</td></tr>`;
|
||||
}).join('')}
|
||||
</table>` : '';
|
||||
|
||||
// Fortschrittsbalken
|
||||
const completed = jobs.filter(j => j.status === 'completed').length;
|
||||
const total = jobs.length;
|
||||
const pct = total > 0 ? Math.round(completed / total * 100) : 0;
|
||||
|
||||
statusEl.innerHTML = `
|
||||
<div style="margin:0.5rem 0;">
|
||||
<div style="background:#eee;border-radius:4px;height:8px;overflow:hidden;">
|
||||
<div style="background:var(--color-green);height:100%;width:${pct}%;transition:width 0.5s;"></div>
|
||||
</div>
|
||||
<span style="font-size:0.8rem;color:#888;">${completed}/${total} fertig · ${qs.concurrency} Worker · ~${Math.round(qs.estimated_wait_seconds/60)} Min.</span>
|
||||
</div>
|
||||
${jobsHtml}
|
||||
`;
|
||||
|
||||
if (qs.pending === 0 && completed > 0) {
|
||||
statusEl.innerHTML += `<br><span style="color:var(--color-green);">✓ Alle Jobs abgeschlossen</span>`;
|
||||
loadAssessments();
|
||||
return;
|
||||
}
|
||||
statusEl.querySelector('.queue-progress')?.remove();
|
||||
statusEl.insertAdjacentHTML('beforeend',
|
||||
`<span class="queue-progress"><br>⏳ ${qs.pending} Jobs in der Queue, ~${Math.round(qs.estimated_wait_seconds/60)} Min. verbleibend</span>`
|
||||
);
|
||||
} catch { break; }
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user