diff --git a/app/auth.py b/app/auth.py
index eb5f278..67547e3 100644
--- a/app/auth.py
+++ b/app/auth.py
@@ -205,6 +205,29 @@ async def require_auth(request: Request) -> dict:
return user
+async def require_admin(request: Request) -> dict:
+ """Admin-Auth — gibt User-Dict oder HTTP 403.
+
+ Prüft ob der User die Rolle 'admin' oder 'gwoe-admin' hat.
+ Im Dev-Modus (Auth deaktiviert): durchlassen.
+ Für: Batch-Analyse, Programm-Indexierung, Assessment-Löschung.
+ """
+ if not _is_auth_enabled():
+ return {"sub": "anonymous", "email": "", "name": "Dev-Modus", "roles": ["admin"]}
+
+ user = await require_auth(request)
+ roles = user.get("roles", [])
+ if "admin" in roles or "gwoe-admin" in roles:
+ return user
+
+ raise HTTPException(
+ status_code=403,
+ detail="Admin-Berechtigung erforderlich",
+ )
+
+ return user
+
+
# ─────────────────────────────────────────────────────────────────────────────
# Auth-Info-Endpoint
# ─────────────────────────────────────────────────────────────────────────────
diff --git a/app/main.py b/app/main.py
index 188aa7c..28129f3 100644
--- a/app/main.py
+++ b/app/main.py
@@ -40,7 +40,7 @@ from .database import (
from .parlamente import get_adapter, ADAPTERS
from .bundeslaender import alle_bundeslaender
from .analyzer import analyze_antrag
-from .auth import get_current_user, require_auth, keycloak_login_url, _is_auth_enabled
+from .auth import get_current_user, require_auth, require_admin, keycloak_login_url, _is_auth_enabled
def _pick_best_title(llm_title: str, doc_title: Optional[str], drucksache: str) -> str:
@@ -487,7 +487,7 @@ async def get_single_assessment(drucksache: str):
@app.delete("/api/assessment/delete")
async def delete_assessment_endpoint(
drucksache: str,
- user: dict = Depends(require_auth),
+ user: dict = Depends(require_admin),
):
"""Löscht ein Assessment, damit es neu analysiert werden kann."""
drucksache = validate_drucksache(drucksache)
@@ -635,7 +635,7 @@ async def batch_analyze(
request: Request,
bundesland: str = Form(...),
limit: int = Form(10),
- user: dict = Depends(require_auth),
+ user: dict = Depends(require_admin),
):
"""Sucht die neuesten Drucksachen im Landtag-Portal und enqueued
alle, die noch nicht in der DB bewertet sind.
@@ -676,6 +676,7 @@ async def batch_analyze(
job_id,
run_drucksache_analysis,
job_id, doc.drucksache, text, bundesland, "qwen-plus", doc,
+ drucksache=doc.drucksache,
)
enqueued.append({
"drucksache": doc.drucksache,
@@ -738,6 +739,7 @@ async def analyze_drucksache(
job_id,
run_drucksache_analysis,
job_id, drucksache, text, bundesland, model, doc,
+ drucksache=drucksache,
)
except QueueFullError:
await update_job(job_id, status="rejected", error="Queue voll")
@@ -1066,7 +1068,7 @@ async def index_programme(
background_tasks: BackgroundTasks,
programm_id: str = Form(None),
all_programmes: bool = Form(False),
- user: dict = Depends(require_auth),
+ user: dict = Depends(require_admin),
):
"""Index programme(s) for semantic search."""
pdf_dir = static_dir / "referenzen"
diff --git a/app/queue.py b/app/queue.py
index 220c27d..4f2a195 100644
--- a/app/queue.py
+++ b/app/queue.py
@@ -1,47 +1,39 @@
-"""Analysis job queue with single-worker processing (#95).
+"""Analysis job queue with configurable parallel workers (#95, #99).
-Implements a FIFO queue backed by the existing ``jobs`` SQLite table
-with an in-memory ``asyncio.Queue`` for the worker loop. On startup,
-any ``status='queued'`` jobs from a previous run are re-enqueued.
-
-Architecture:
-- ``enqueue(job_id, callback, *args)`` → puts a job on the queue
-- A single worker coroutine processes jobs sequentially (concurrency=1)
- with a configurable pause between calls (default 10s, DashScope-friendly)
-- ``get_queue_status()`` returns position count + estimated wait
-- ``MAX_QUEUE_SIZE`` limits backpressure → HTTP 429 when full
-
-The worker is started via ``start_worker()`` which should be called
-from the FastAPI startup event.
+Processes jobs via an asyncio.Queue with N concurrent workers (Semaphore).
+Tracks per-job status for live UI visualization.
"""
import asyncio
import logging
+import os
import time
from typing import Any, Callable, Coroutine, Optional
logger = logging.getLogger(__name__)
# Konfiguration
-MAX_QUEUE_SIZE = 50 # Maximale Queue-Tiefe bevor 429 zurückgegeben wird
-MIN_PAUSE_SECONDS = 10 # Mindest-Pause zwischen LLM-Calls (DashScope Rate-Limit)
-BACKOFF_BASE = 15 # Exponentielles Backoff bei Fehlern (Sekunden)
-BACKOFF_MAX = 300 # Maximales Backoff (5 Minuten)
+MAX_QUEUE_SIZE = 50
+CONCURRENCY = int(os.environ.get("QUEUE_CONCURRENCY", "3"))
+MIN_PAUSE_SECONDS = 3 # Pause pro Worker zwischen Jobs
+BACKOFF_BASE = 15
+BACKOFF_MAX = 300
-# In-Memory Queue
+# In-Memory Queue + Job-Tracking
_queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
-_worker_task: Optional[asyncio.Task] = None
+_worker_tasks: list[asyncio.Task] = []
_stats = {
"processed": 0,
"failed": 0,
"started_at": None,
- "last_completed_at": None,
- "avg_duration": 60.0, # Initialer Schätzwert: 60s pro Job
+ "avg_duration": 60.0,
}
+# Live Job-Tracking: job_id → {status, drucksache, started_at, duration, error}
+_jobs: dict[str, dict] = {}
+_MAX_TRACKED_JOBS = 100 # Älteste Jobs werden verworfen
class QueueFullError(Exception):
- """Raised when the queue is at capacity."""
pass
@@ -49,112 +41,136 @@ async def enqueue(
job_id: str,
callback: Callable[..., Coroutine],
*args: Any,
+ drucksache: str = "",
**kwargs: Any,
) -> int:
- """Add a job to the queue. Returns the queue position (1-indexed).
-
- Raises ``QueueFullError`` if the queue is at capacity.
- """
+ """Add a job to the queue. Returns queue position."""
try:
_queue.put_nowait((job_id, callback, args, kwargs))
except asyncio.QueueFull:
- raise QueueFullError(
- f"Queue voll ({MAX_QUEUE_SIZE} Jobs). Bitte später erneut versuchen."
- )
+ raise QueueFullError(f"Queue voll ({MAX_QUEUE_SIZE} Jobs).")
+ _jobs[job_id] = {
+ "status": "queued",
+ "drucksache": drucksache,
+ "enqueued_at": time.time(),
+ "started_at": None,
+ "duration": None,
+ "error": None,
+ }
+ # Alte Jobs trimmen
+ if len(_jobs) > _MAX_TRACKED_JOBS:
+ oldest = sorted(_jobs, key=lambda k: _jobs[k].get("enqueued_at", 0))
+ for k in oldest[:len(_jobs) - _MAX_TRACKED_JOBS]:
+ del _jobs[k]
position = _queue.qsize()
- logger.info("Job %s enqueued at position %d", job_id, position)
+ logger.info("Job %s enqueued at position %d (concurrency=%d)", job_id, position, CONCURRENCY)
return position
def get_queue_status() -> dict:
- """Current queue status for the /api/queue/status endpoint."""
+ """Queue status + per-job details for UI visualization."""
pending = _queue.qsize()
avg = _stats["avg_duration"]
- estimated_wait = pending * (avg + MIN_PAUSE_SECONDS)
+ # Bei N Workern teilt sich die Wartezeit
+ estimated_wait = (pending / max(CONCURRENCY, 1)) * (avg + MIN_PAUSE_SECONDS)
+
+ # Jobs nach Status gruppieren
+ recent_jobs = sorted(_jobs.values(), key=lambda j: j.get("enqueued_at", 0), reverse=True)[:30]
+
return {
"pending": pending,
"max_size": MAX_QUEUE_SIZE,
+ "concurrency": CONCURRENCY,
"processed_total": _stats["processed"],
"failed_total": _stats["failed"],
"estimated_wait_seconds": round(estimated_wait),
"avg_job_duration_seconds": round(avg, 1),
- "worker_running": _worker_task is not None and not _worker_task.done(),
+ "workers_running": sum(1 for t in _worker_tasks if not t.done()),
+ "jobs": [{
+ "job_id": jid,
+ "drucksache": j.get("drucksache", ""),
+ "status": j["status"],
+ "duration": round(j["duration"], 1) if j.get("duration") else None,
+ "error": j.get("error"),
+ } for jid, j in list(_jobs.items())[-30:]],
}
-async def _worker():
- """Single worker coroutine — processes jobs sequentially."""
- logger.info("Queue worker started (max_queue=%d, pause=%ds)", MAX_QUEUE_SIZE, MIN_PAUSE_SECONDS)
- _stats["started_at"] = time.time()
+async def _worker(worker_id: int):
+ """Worker coroutine — picks jobs from queue, processes with Semaphore."""
+ logger.info("Worker %d started", worker_id)
consecutive_failures = 0
while True:
job_id, callback, args, kwargs = await _queue.get()
t0 = time.time()
+ if job_id in _jobs:
+ _jobs[job_id]["status"] = "processing"
+ _jobs[job_id]["started_at"] = t0
+
try:
- logger.info("Processing job %s (queue remaining: %d)", job_id, _queue.qsize())
+ logger.info("Worker %d processing %s (queue: %d)", worker_id, job_id, _queue.qsize())
await callback(*args, **kwargs)
duration = time.time() - t0
_stats["processed"] += 1
- _stats["last_completed_at"] = time.time()
- # Gleitender Durchschnitt der Job-Dauer
_stats["avg_duration"] = (_stats["avg_duration"] * 0.8) + (duration * 0.2)
consecutive_failures = 0
- logger.info("Job %s completed in %.1fs", job_id, duration)
- except Exception:
+ if job_id in _jobs:
+ _jobs[job_id]["status"] = "completed"
+ _jobs[job_id]["duration"] = duration
+ logger.info("Worker %d completed %s in %.1fs", worker_id, job_id, duration)
+
+ except Exception as e:
_stats["failed"] += 1
consecutive_failures += 1
- logger.exception("Job %s failed", job_id)
+ if job_id in _jobs:
+ _jobs[job_id]["status"] = "failed"
+ _jobs[job_id]["duration"] = time.time() - t0
+ _jobs[job_id]["error"] = str(e)[:100]
+ logger.exception("Worker %d failed %s", worker_id, job_id)
- # Exponentielles Backoff bei Serien-Fehlern (z.B. LLM-Throttling)
if consecutive_failures > 1:
backoff = min(BACKOFF_BASE * (2 ** (consecutive_failures - 2)), BACKOFF_MAX)
- logger.warning("Backoff %ds after %d consecutive failures", backoff, consecutive_failures)
+ logger.warning("Worker %d backoff %ds", worker_id, backoff)
await asyncio.sleep(backoff)
finally:
_queue.task_done()
- # Mindest-Pause zwischen Jobs (DashScope-freundlich)
await asyncio.sleep(MIN_PAUSE_SECONDS)
-def start_worker() -> asyncio.Task:
- """Start the worker coroutine. Call from FastAPI startup event."""
- global _worker_task
- if _worker_task is not None and not _worker_task.done():
- logger.warning("Worker already running, not starting a second one")
- return _worker_task
- _worker_task = asyncio.create_task(_worker())
- return _worker_task
+def start_worker() -> list[asyncio.Task]:
+ """Start N worker coroutines."""
+ global _worker_tasks
+ _stats["started_at"] = time.time()
+ for i in range(CONCURRENCY):
+ if i < len(_worker_tasks) and not _worker_tasks[i].done():
+ continue
+ task = asyncio.create_task(_worker(i))
+ if i < len(_worker_tasks):
+ _worker_tasks[i] = task
+ else:
+ _worker_tasks.append(task)
+ logger.info("Queue: %d workers started (QUEUE_CONCURRENCY=%d)", CONCURRENCY, CONCURRENCY)
+ return _worker_tasks
async def re_enqueue_pending():
- """Re-enqueue jobs with status='queued' from a previous run.
-
- Called at startup to recover from container restarts. Only picks up
- jobs that were queued but never started processing.
- """
+ """Mark stale queued jobs from previous run."""
import aiosqlite
from .config import settings
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
- rows = await db.execute(
- "SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at"
- )
+ rows = await db.execute("SELECT id FROM jobs WHERE status = 'queued' ORDER BY created_at")
queued = await rows.fetchall()
if not queued:
return
- logger.info("Re-enqueueing %d pending jobs from previous run", len(queued))
- # We can't re-enqueue with the original callback here because we don't
- # know the original parameters. Mark them as 'stale' instead — the user
- # can re-trigger via the UI.
- import aiosqlite
async with aiosqlite.connect(settings.db_path) as db:
for row in queued:
await db.execute(
@@ -162,4 +178,4 @@ async def re_enqueue_pending():
(row["id"],),
)
await db.commit()
- logger.info("Marked %d stale jobs (user must re-trigger)", len(queued))
+ logger.info("Marked %d stale jobs", len(queued))
diff --git a/app/templates/index.html b/app/templates/index.html
index 0c06336..cbbee97 100644
--- a/app/templates/index.html
+++ b/app/templates/index.html
@@ -1387,18 +1387,43 @@
async function pollBatchQueue(statusEl) {
for (let i = 0; i < 200; i++) {
- await new Promise(r => setTimeout(r, 5000));
+ await new Promise(r => setTimeout(r, 3000));
try {
const qs = await fetch('/api/queue/status').then(r => r.json());
- if (qs.pending === 0) {
- statusEl.innerHTML += `
✓ Alle Jobs abgeschlossen (${qs.processed_total} verarbeitet)`;
- loadAssessments(); // Liste aktualisieren
+
+ // Job-Tabelle rendern
+ const jobs = qs.jobs || [];
+ const jobsHtml = jobs.length > 0 ? `
+
| Drucksache | Status | Dauer |
| ${link} | ${statusIcon} ${j.status} | ${dur} |