#44 Batch-Analyse: POST /api/batch-analyze

Neuer Endpoint der die neuesten ungeprüften Drucksachen eines BL
automatisch sucht, herunterlädt und in die Queue (#95) einreiht:

POST /api/batch-analyze
  bundesland=NRW  (Pflicht)
  limit=10        (1-100, default 10)

Flow:
1. adapter.search("", limit=limit*3) holt neueste Drucksachen
2. Pro Drucksache: check ob schon bewertet → skip
3. download_text → enqueue(run_drucksache_analysis)
4. Queue verarbeitet seriell mit 10s Pause (DashScope-freundlich)

Response:
{
  "status": "batch_enqueued",
  "enqueued": 7,
  "skipped_existing": 3,
  "jobs": [{"drucksache": "18/...", "title": "...", "queue_position": 1}, ...]
}

Rate-limited auf 3/min. Erfordert Auth (#43).
Bei voller Queue: enqueued nur soweit Platz, kein Error.

Tests: 201 passed.

Refs: #44, #95 (Queue-Basis)
This commit is contained in:
Dotty Dotter 2026-04-10 17:26:05 +02:00
parent 289d37a84b
commit 6a433e9217

View File

@ -504,6 +504,73 @@ async def search_landtag(
return {"error": f"Suchfehler: {str(e)}"}
# API: Batch-Analyse (#44) — enqueued ungeprüfte Drucksachen eines BL
@app.post("/api/batch-analyze")
@limiter.limit("3/minute")
async def batch_analyze(
request: Request,
bundesland: str = Form(...),
limit: int = Form(10),
user: dict = Depends(require_auth),
):
"""Sucht die neuesten Drucksachen im Landtag-Portal und enqueued
alle, die noch nicht in der DB bewertet sind.
Returns: Liste der enqueued Drucksachen + Queue-Position.
"""
from .queue import enqueue, QueueFullError
if limit < 1 or limit > 100:
raise HTTPException(status_code=400, detail="limit muss 1-100 sein")
adapter = get_adapter(bundesland)
if not adapter:
raise HTTPException(status_code=400, detail=f"Bundesland {bundesland} nicht unterstützt")
# Neueste Drucksachen vom Landtag holen (leer = neueste Anträge)
drucksachen = await adapter.search("", limit=limit * 3) # 3× holen wegen Typ-Filter
enqueued = []
skipped = 0
for doc in drucksachen:
if len(enqueued) >= limit:
break
# Schon bewertet?
existing = await get_assessment(doc.drucksache)
if existing:
skipped += 1
continue
# Text herunterladen
text = await adapter.download_text(doc.drucksache)
if not text:
continue
# Enqueue
job_id = str(uuid.uuid4())
await create_job(job_id, text[:500], bundesland, "qwen-plus")
try:
position = await enqueue(
job_id,
run_drucksache_analysis,
job_id, doc.drucksache, text, bundesland, "qwen-plus", doc,
)
enqueued.append({
"drucksache": doc.drucksache,
"title": doc.title,
"job_id": job_id,
"queue_position": position,
})
except QueueFullError:
break
return {
"status": "batch_enqueued",
"bundesland": bundesland,
"enqueued": len(enqueued),
"skipped_existing": skipped,
"jobs": enqueued,
}
# API: Analyze a document from parliament portal
@app.post("/api/analyze-drucksache")
@limiter.limit("10/minute")