2026-04-10 23:15:42 +02:00
|
|
|
"""Analysis job queue with configurable parallel workers (#95, #99).
|
2026-04-10 17:24:34 +02:00
|
|
|
|
2026-04-10 23:15:42 +02:00
|
|
|
Processes jobs via an asyncio.Queue with N concurrent workers (Semaphore).
|
|
|
|
|
Tracks per-job status for live UI visualization.
|
2026-04-10 17:24:34 +02:00
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import logging
|
2026-04-10 23:15:42 +02:00
|
|
|
import os
|
2026-04-10 17:24:34 +02:00
|
|
|
import time
|
|
|
|
|
from typing import Any, Callable, Coroutine, Optional
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
# Konfiguration
|
2026-04-10 23:15:42 +02:00
|
|
|
MAX_QUEUE_SIZE = 50
|
|
|
|
|
CONCURRENCY = int(os.environ.get("QUEUE_CONCURRENCY", "3"))
|
2026-04-10 23:20:23 +02:00
|
|
|
MIN_PAUSE_SECONDS = 3
|
|
|
|
|
_shutting_down = False # Sperrt neue Jobs bei Graceful Shutdown
|
2026-04-10 23:15:42 +02:00
|
|
|
BACKOFF_BASE = 15
|
|
|
|
|
BACKOFF_MAX = 300
|
2026-04-10 17:24:34 +02:00
|
|
|
|
2026-04-10 23:15:42 +02:00
|
|
|
# In-Memory Queue + Job-Tracking
|
2026-04-10 17:24:34 +02:00
|
|
|
_queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
|
2026-04-10 23:15:42 +02:00
|
|
|
_worker_tasks: list[asyncio.Task] = []
|
2026-04-10 17:24:34 +02:00
|
|
|
_stats = {
|
|
|
|
|
"processed": 0,
|
|
|
|
|
"failed": 0,
|
|
|
|
|
"started_at": None,
|
2026-04-10 23:15:42 +02:00
|
|
|
"avg_duration": 60.0,
|
2026-04-10 17:24:34 +02:00
|
|
|
}
|
2026-04-10 23:15:42 +02:00
|
|
|
# Live Job-Tracking: job_id → {status, drucksache, started_at, duration, error}
|
|
|
|
|
_jobs: dict[str, dict] = {}
|
|
|
|
|
_MAX_TRACKED_JOBS = 100 # Älteste Jobs werden verworfen
|
2026-04-10 17:24:34 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class QueueFullError(Exception):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def enqueue(
|
|
|
|
|
job_id: str,
|
|
|
|
|
callback: Callable[..., Coroutine],
|
|
|
|
|
*args: Any,
|
2026-04-10 23:15:42 +02:00
|
|
|
drucksache: str = "",
|
2026-04-10 17:24:34 +02:00
|
|
|
**kwargs: Any,
|
|
|
|
|
) -> int:
|
2026-04-10 23:15:42 +02:00
|
|
|
"""Add a job to the queue. Returns queue position."""
|
2026-04-10 23:20:23 +02:00
|
|
|
if _shutting_down:
|
|
|
|
|
raise QueueFullError("Server wird neu gestartet. Bitte in Kürze erneut versuchen.")
|
2026-04-10 17:24:34 +02:00
|
|
|
try:
|
|
|
|
|
_queue.put_nowait((job_id, callback, args, kwargs))
|
|
|
|
|
except asyncio.QueueFull:
|
2026-04-10 23:15:42 +02:00
|
|
|
raise QueueFullError(f"Queue voll ({MAX_QUEUE_SIZE} Jobs).")
|
|
|
|
|
_jobs[job_id] = {
|
|
|
|
|
"status": "queued",
|
|
|
|
|
"drucksache": drucksache,
|
|
|
|
|
"enqueued_at": time.time(),
|
|
|
|
|
"started_at": None,
|
|
|
|
|
"duration": None,
|
|
|
|
|
"error": None,
|
|
|
|
|
}
|
|
|
|
|
# Alte Jobs trimmen
|
|
|
|
|
if len(_jobs) > _MAX_TRACKED_JOBS:
|
|
|
|
|
oldest = sorted(_jobs, key=lambda k: _jobs[k].get("enqueued_at", 0))
|
|
|
|
|
for k in oldest[:len(_jobs) - _MAX_TRACKED_JOBS]:
|
|
|
|
|
del _jobs[k]
|
2026-04-10 17:24:34 +02:00
|
|
|
position = _queue.qsize()
|
2026-04-10 23:15:42 +02:00
|
|
|
logger.info("Job %s enqueued at position %d (concurrency=%d)", job_id, position, CONCURRENCY)
|
2026-04-10 17:24:34 +02:00
|
|
|
return position
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_queue_status() -> dict:
|
2026-04-10 23:15:42 +02:00
|
|
|
"""Queue status + per-job details for UI visualization."""
|
2026-04-10 17:24:34 +02:00
|
|
|
pending = _queue.qsize()
|
|
|
|
|
avg = _stats["avg_duration"]
|
2026-04-10 23:15:42 +02:00
|
|
|
# Bei N Workern teilt sich die Wartezeit
|
|
|
|
|
estimated_wait = (pending / max(CONCURRENCY, 1)) * (avg + MIN_PAUSE_SECONDS)
|
|
|
|
|
|
|
|
|
|
# Jobs nach Status gruppieren
|
|
|
|
|
recent_jobs = sorted(_jobs.values(), key=lambda j: j.get("enqueued_at", 0), reverse=True)[:30]
|
|
|
|
|
|
2026-04-10 23:32:40 +02:00
|
|
|
# Stale jobs aus DB laden (nach Container-Restart)
|
|
|
|
|
stale_jobs = []
|
|
|
|
|
try:
|
|
|
|
|
import sqlite3
|
|
|
|
|
from .config import settings
|
|
|
|
|
conn = sqlite3.connect(settings.db_path)
|
|
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
|
rows = conn.execute(
|
|
|
|
|
"SELECT id, bundesland, status, created_at FROM jobs "
|
|
|
|
|
"WHERE status IN ('stale', 'queued', 'processing') ORDER BY created_at DESC LIMIT 20"
|
|
|
|
|
).fetchall()
|
|
|
|
|
conn.close()
|
|
|
|
|
stale_jobs = [{"job_id": r["id"], "bundesland": r["bundesland"] or "",
|
|
|
|
|
"status": "stale", "drucksache": r["drucksache"] if "drucksache" in r.keys() else "",
|
|
|
|
|
"duration": None, "error": "Container-Restart"} for r in rows]
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
2026-04-10 17:24:34 +02:00
|
|
|
return {
|
|
|
|
|
"pending": pending,
|
|
|
|
|
"max_size": MAX_QUEUE_SIZE,
|
2026-04-10 23:15:42 +02:00
|
|
|
"concurrency": CONCURRENCY,
|
2026-04-10 23:20:23 +02:00
|
|
|
"shutting_down": _shutting_down,
|
2026-04-10 17:24:34 +02:00
|
|
|
"processed_total": _stats["processed"],
|
|
|
|
|
"failed_total": _stats["failed"],
|
|
|
|
|
"estimated_wait_seconds": round(estimated_wait),
|
|
|
|
|
"avg_job_duration_seconds": round(avg, 1),
|
2026-04-10 23:15:42 +02:00
|
|
|
"workers_running": sum(1 for t in _worker_tasks if not t.done()),
|
|
|
|
|
"jobs": [{
|
|
|
|
|
"job_id": jid,
|
|
|
|
|
"drucksache": j.get("drucksache", ""),
|
|
|
|
|
"status": j["status"],
|
|
|
|
|
"duration": round(j["duration"], 1) if j.get("duration") else None,
|
|
|
|
|
"error": j.get("error"),
|
2026-04-10 23:32:40 +02:00
|
|
|
} for jid, j in list(_jobs.items())[-30:]] + stale_jobs,
|
2026-04-10 17:24:34 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2026-04-10 23:15:42 +02:00
|
|
|
async def _worker(worker_id: int):
|
|
|
|
|
"""Worker coroutine — picks jobs from queue, processes with Semaphore."""
|
|
|
|
|
logger.info("Worker %d started", worker_id)
|
2026-04-10 17:24:34 +02:00
|
|
|
consecutive_failures = 0
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
job_id, callback, args, kwargs = await _queue.get()
|
|
|
|
|
t0 = time.time()
|
|
|
|
|
|
2026-04-10 23:15:42 +02:00
|
|
|
if job_id in _jobs:
|
|
|
|
|
_jobs[job_id]["status"] = "processing"
|
|
|
|
|
_jobs[job_id]["started_at"] = t0
|
|
|
|
|
|
2026-04-10 17:24:34 +02:00
|
|
|
try:
|
2026-04-10 23:15:42 +02:00
|
|
|
logger.info("Worker %d processing %s (queue: %d)", worker_id, job_id, _queue.qsize())
|
2026-04-10 17:24:34 +02:00
|
|
|
await callback(*args, **kwargs)
|
|
|
|
|
duration = time.time() - t0
|
|
|
|
|
_stats["processed"] += 1
|
|
|
|
|
_stats["avg_duration"] = (_stats["avg_duration"] * 0.8) + (duration * 0.2)
|
|
|
|
|
consecutive_failures = 0
|
|
|
|
|
|
2026-04-10 23:15:42 +02:00
|
|
|
if job_id in _jobs:
|
|
|
|
|
_jobs[job_id]["status"] = "completed"
|
|
|
|
|
_jobs[job_id]["duration"] = duration
|
|
|
|
|
logger.info("Worker %d completed %s in %.1fs", worker_id, job_id, duration)
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
2026-04-10 17:24:34 +02:00
|
|
|
_stats["failed"] += 1
|
|
|
|
|
consecutive_failures += 1
|
2026-04-10 23:15:42 +02:00
|
|
|
if job_id in _jobs:
|
|
|
|
|
_jobs[job_id]["status"] = "failed"
|
|
|
|
|
_jobs[job_id]["duration"] = time.time() - t0
|
|
|
|
|
_jobs[job_id]["error"] = str(e)[:100]
|
|
|
|
|
logger.exception("Worker %d failed %s", worker_id, job_id)
|
2026-04-10 17:24:34 +02:00
|
|
|
|
|
|
|
|
if consecutive_failures > 1:
|
|
|
|
|
backoff = min(BACKOFF_BASE * (2 ** (consecutive_failures - 2)), BACKOFF_MAX)
|
2026-04-10 23:15:42 +02:00
|
|
|
logger.warning("Worker %d backoff %ds", worker_id, backoff)
|
2026-04-10 17:24:34 +02:00
|
|
|
await asyncio.sleep(backoff)
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
_queue.task_done()
|
|
|
|
|
|
|
|
|
|
await asyncio.sleep(MIN_PAUSE_SECONDS)
|
|
|
|
|
|
|
|
|
|
|
2026-04-10 23:15:42 +02:00
|
|
|
def start_worker() -> list[asyncio.Task]:
|
|
|
|
|
"""Start N worker coroutines."""
|
|
|
|
|
global _worker_tasks
|
|
|
|
|
_stats["started_at"] = time.time()
|
|
|
|
|
for i in range(CONCURRENCY):
|
|
|
|
|
if i < len(_worker_tasks) and not _worker_tasks[i].done():
|
|
|
|
|
continue
|
|
|
|
|
task = asyncio.create_task(_worker(i))
|
|
|
|
|
if i < len(_worker_tasks):
|
|
|
|
|
_worker_tasks[i] = task
|
|
|
|
|
else:
|
|
|
|
|
_worker_tasks.append(task)
|
|
|
|
|
logger.info("Queue: %d workers started (QUEUE_CONCURRENCY=%d)", CONCURRENCY, CONCURRENCY)
|
|
|
|
|
return _worker_tasks
|
2026-04-10 17:24:34 +02:00
|
|
|
|
|
|
|
|
|
2026-04-10 23:20:23 +02:00
|
|
|
async def graceful_shutdown(timeout: int = 900):
|
|
|
|
|
"""Graceful Shutdown: aktuell laufende Jobs beenden, Queue sperren.
|
2026-04-10 23:17:46 +02:00
|
|
|
|
2026-04-10 23:20:23 +02:00
|
|
|
1. Sperrt neue Jobs (_shutting_down = True)
|
|
|
|
|
2. Wartet bis alle gerade PROCESSING-Jobs fertig sind (max timeout)
|
|
|
|
|
3. Queued-Jobs bleiben in der DB als 'stale' → User kann nach
|
|
|
|
|
Restart erneut triggern
|
|
|
|
|
|
|
|
|
|
Timeout 15 min (900s) — ein einzelner LLM-Call dauert max ~120s,
|
|
|
|
|
bei 3 parallelen Workern also max ~120s reale Wartezeit.
|
2026-04-10 23:17:46 +02:00
|
|
|
"""
|
2026-04-10 23:20:23 +02:00
|
|
|
global _shutting_down
|
|
|
|
|
_shutting_down = True
|
|
|
|
|
|
2026-04-10 23:17:46 +02:00
|
|
|
processing = sum(1 for j in _jobs.values() if j.get("status") == "processing")
|
2026-04-10 23:20:23 +02:00
|
|
|
pending = _queue.qsize()
|
|
|
|
|
|
|
|
|
|
if processing == 0:
|
|
|
|
|
logger.info("Graceful shutdown: keine laufenden Jobs, sofort beenden (%d queued verworfen)", pending)
|
2026-04-10 23:17:46 +02:00
|
|
|
return
|
|
|
|
|
|
2026-04-10 23:20:23 +02:00
|
|
|
logger.warning("Graceful shutdown: warte auf %d laufende Jobs (max %ds). %d queued werden beim Restart stale.",
|
|
|
|
|
processing, timeout, pending)
|
|
|
|
|
|
|
|
|
|
# Warte nur auf die laufenden Jobs, nicht auf die ganze Queue
|
|
|
|
|
start = time.time()
|
|
|
|
|
while time.time() - start < timeout:
|
|
|
|
|
still_processing = sum(1 for j in _jobs.values() if j.get("status") == "processing")
|
|
|
|
|
if still_processing == 0:
|
|
|
|
|
logger.info("Graceful shutdown: alle laufenden Jobs beendet nach %.0fs", time.time() - start)
|
|
|
|
|
return
|
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
|
|
|
|
|
|
logger.error("Graceful shutdown: Timeout nach %ds, %d Jobs noch aktiv",
|
|
|
|
|
timeout, sum(1 for j in _jobs.values() if j.get("status") == "processing"))
|
2026-04-10 23:17:46 +02:00
|
|
|
|
|
|
|
|
|
feat(#139,#129,#138,#141): v2-Frontend (ECOnGOOD-CD), Login-Modal, Auto-DL, OG-Cards
v2-Frontend (#139, ECOnGOOD CD Manual Juni 2024):
- app/static/v2/: tokens.css, fonts.css, v2.css, Nunito-Sans woff2, Phosphor-Icons (21 SVGs)
- app/templates/v2/: base.html + 11 Screens + 8 Component-Macros
- AppShell mit Sidebar (Lesen/Pruefen/Daten/Admin), v2-Detail mit allen Features
(ScoreHero, MatrixMini, QuoteCard, Redline, Fraktions-Scores)
- v2 ist jetzt Default unter / — classic unter /classic
- Login-Modal in v2-Topbar mit Tabs Anmelden/Registrieren (#129)
- Phosphor-Icons in Sidebar + Topbar mit dynamischem Theme-Toggle
- Keyboard-Shortcuts (j/k/Enter/Esc/?/path), Landtag-Suche, Antrag-Historie,
Sort-Dropdown, Matrix-Feld-Info-Modal, Bookmarks/Comments/Voting/Share/Re-Analyze
Backend-Erweiterungen:
- main.py: ~30 neue Routes (/v2/*, /antrag/{ds}, /api/auth/{login,refresh,logout},
/api/me/merkliste/*, /api/admin/*, /v2/admin/*, OG-Cards, etc.)
- og_card.py + og_template: Open-Graph-Bilder via Playwright (#141)
- wahlprogramm_fetch.py + wahlprogramm-links.yaml: SHA-Gate Auto-DL (#138)
- auswertungen.py: BL-Filter + get_wahlperioden Helper (#137)
- auth.py: Direct-Access-Grant + Refresh-Token-Cookie
Classic-Updates:
- Header-DRY via _header.html, Auswertungen redirected, Batch-Inline raus
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 20:55:57 +02:00
|
|
|
async def re_enqueue_pending(analysis_callback=None):
|
2026-04-10 23:32:40 +02:00
|
|
|
"""Re-enqueue jobs that were queued or processing when the container died.
|
|
|
|
|
|
feat(#139,#129,#138,#141): v2-Frontend (ECOnGOOD-CD), Login-Modal, Auto-DL, OG-Cards
v2-Frontend (#139, ECOnGOOD CD Manual Juni 2024):
- app/static/v2/: tokens.css, fonts.css, v2.css, Nunito-Sans woff2, Phosphor-Icons (21 SVGs)
- app/templates/v2/: base.html + 11 Screens + 8 Component-Macros
- AppShell mit Sidebar (Lesen/Pruefen/Daten/Admin), v2-Detail mit allen Features
(ScoreHero, MatrixMini, QuoteCard, Redline, Fraktions-Scores)
- v2 ist jetzt Default unter / — classic unter /classic
- Login-Modal in v2-Topbar mit Tabs Anmelden/Registrieren (#129)
- Phosphor-Icons in Sidebar + Topbar mit dynamischem Theme-Toggle
- Keyboard-Shortcuts (j/k/Enter/Esc/?/path), Landtag-Suche, Antrag-Historie,
Sort-Dropdown, Matrix-Feld-Info-Modal, Bookmarks/Comments/Voting/Share/Re-Analyze
Backend-Erweiterungen:
- main.py: ~30 neue Routes (/v2/*, /antrag/{ds}, /api/auth/{login,refresh,logout},
/api/me/merkliste/*, /api/admin/*, /v2/admin/*, OG-Cards, etc.)
- og_card.py + og_template: Open-Graph-Bilder via Playwright (#141)
- wahlprogramm_fetch.py + wahlprogramm-links.yaml: SHA-Gate Auto-DL (#138)
- auswertungen.py: BL-Filter + get_wahlperioden Helper (#137)
- auth.py: Direct-Access-Grant + Refresh-Token-Cookie
Classic-Updates:
- Header-DRY via _header.html, Auswertungen redirected, Batch-Inline raus
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 20:55:57 +02:00
|
|
|
Jobs WITH a drucksache column get re-enqueued automatically (if callback provided).
|
|
|
|
|
Jobs WITHOUT drucksache (legacy) get marked as stale and cleaned up.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
analysis_callback: async function(job_id, drucksache, text, bundesland, model, doc)
|
2026-04-10 23:32:40 +02:00
|
|
|
"""
|
2026-04-10 17:24:34 +02:00
|
|
|
import aiosqlite
|
|
|
|
|
from .config import settings
|
|
|
|
|
|
|
|
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
|
|
|
db.row_factory = aiosqlite.Row
|
2026-04-10 23:32:40 +02:00
|
|
|
rows = await db.execute(
|
feat(#139,#129,#138,#141): v2-Frontend (ECOnGOOD-CD), Login-Modal, Auto-DL, OG-Cards
v2-Frontend (#139, ECOnGOOD CD Manual Juni 2024):
- app/static/v2/: tokens.css, fonts.css, v2.css, Nunito-Sans woff2, Phosphor-Icons (21 SVGs)
- app/templates/v2/: base.html + 11 Screens + 8 Component-Macros
- AppShell mit Sidebar (Lesen/Pruefen/Daten/Admin), v2-Detail mit allen Features
(ScoreHero, MatrixMini, QuoteCard, Redline, Fraktions-Scores)
- v2 ist jetzt Default unter / — classic unter /classic
- Login-Modal in v2-Topbar mit Tabs Anmelden/Registrieren (#129)
- Phosphor-Icons in Sidebar + Topbar mit dynamischem Theme-Toggle
- Keyboard-Shortcuts (j/k/Enter/Esc/?/path), Landtag-Suche, Antrag-Historie,
Sort-Dropdown, Matrix-Feld-Info-Modal, Bookmarks/Comments/Voting/Share/Re-Analyze
Backend-Erweiterungen:
- main.py: ~30 neue Routes (/v2/*, /antrag/{ds}, /api/auth/{login,refresh,logout},
/api/me/merkliste/*, /api/admin/*, /v2/admin/*, OG-Cards, etc.)
- og_card.py + og_template: Open-Graph-Bilder via Playwright (#141)
- wahlprogramm_fetch.py + wahlprogramm-links.yaml: SHA-Gate Auto-DL (#138)
- auswertungen.py: BL-Filter + get_wahlperioden Helper (#137)
- auth.py: Direct-Access-Grant + Refresh-Token-Cookie
Classic-Updates:
- Header-DRY via _header.html, Auswertungen redirected, Batch-Inline raus
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 20:55:57 +02:00
|
|
|
"SELECT id, bundesland, drucksache, model FROM jobs "
|
2026-04-10 23:32:40 +02:00
|
|
|
"WHERE status IN ('queued', 'processing') ORDER BY created_at"
|
|
|
|
|
)
|
|
|
|
|
pending = await rows.fetchall()
|
2026-04-10 17:24:34 +02:00
|
|
|
|
2026-04-10 23:32:40 +02:00
|
|
|
if not pending:
|
feat(#139,#129,#138,#141): v2-Frontend (ECOnGOOD-CD), Login-Modal, Auto-DL, OG-Cards
v2-Frontend (#139, ECOnGOOD CD Manual Juni 2024):
- app/static/v2/: tokens.css, fonts.css, v2.css, Nunito-Sans woff2, Phosphor-Icons (21 SVGs)
- app/templates/v2/: base.html + 11 Screens + 8 Component-Macros
- AppShell mit Sidebar (Lesen/Pruefen/Daten/Admin), v2-Detail mit allen Features
(ScoreHero, MatrixMini, QuoteCard, Redline, Fraktions-Scores)
- v2 ist jetzt Default unter / — classic unter /classic
- Login-Modal in v2-Topbar mit Tabs Anmelden/Registrieren (#129)
- Phosphor-Icons in Sidebar + Topbar mit dynamischem Theme-Toggle
- Keyboard-Shortcuts (j/k/Enter/Esc/?/path), Landtag-Suche, Antrag-Historie,
Sort-Dropdown, Matrix-Feld-Info-Modal, Bookmarks/Comments/Voting/Share/Re-Analyze
Backend-Erweiterungen:
- main.py: ~30 neue Routes (/v2/*, /antrag/{ds}, /api/auth/{login,refresh,logout},
/api/me/merkliste/*, /api/admin/*, /v2/admin/*, OG-Cards, etc.)
- og_card.py + og_template: Open-Graph-Bilder via Playwright (#141)
- wahlprogramm_fetch.py + wahlprogramm-links.yaml: SHA-Gate Auto-DL (#138)
- auswertungen.py: BL-Filter + get_wahlperioden Helper (#137)
- auth.py: Direct-Access-Grant + Refresh-Token-Cookie
Classic-Updates:
- Header-DRY via _header.html, Auswertungen redirected, Batch-Inline raus
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 20:55:57 +02:00
|
|
|
# Alte stale-Jobs ohne drucksache aufräumen
|
|
|
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
|
|
|
deleted = await db.execute(
|
|
|
|
|
"DELETE FROM jobs WHERE status='stale' AND (drucksache IS NULL OR drucksache='')"
|
|
|
|
|
)
|
|
|
|
|
if deleted.rowcount > 0:
|
|
|
|
|
logger.info("Cleaned up %d legacy stale jobs without drucksache", deleted.rowcount)
|
|
|
|
|
await db.commit()
|
2026-04-10 17:24:34 +02:00
|
|
|
return
|
|
|
|
|
|
feat(#139,#129,#138,#141): v2-Frontend (ECOnGOOD-CD), Login-Modal, Auto-DL, OG-Cards
v2-Frontend (#139, ECOnGOOD CD Manual Juni 2024):
- app/static/v2/: tokens.css, fonts.css, v2.css, Nunito-Sans woff2, Phosphor-Icons (21 SVGs)
- app/templates/v2/: base.html + 11 Screens + 8 Component-Macros
- AppShell mit Sidebar (Lesen/Pruefen/Daten/Admin), v2-Detail mit allen Features
(ScoreHero, MatrixMini, QuoteCard, Redline, Fraktions-Scores)
- v2 ist jetzt Default unter / — classic unter /classic
- Login-Modal in v2-Topbar mit Tabs Anmelden/Registrieren (#129)
- Phosphor-Icons in Sidebar + Topbar mit dynamischem Theme-Toggle
- Keyboard-Shortcuts (j/k/Enter/Esc/?/path), Landtag-Suche, Antrag-Historie,
Sort-Dropdown, Matrix-Feld-Info-Modal, Bookmarks/Comments/Voting/Share/Re-Analyze
Backend-Erweiterungen:
- main.py: ~30 neue Routes (/v2/*, /antrag/{ds}, /api/auth/{login,refresh,logout},
/api/me/merkliste/*, /api/admin/*, /v2/admin/*, OG-Cards, etc.)
- og_card.py + og_template: Open-Graph-Bilder via Playwright (#141)
- wahlprogramm_fetch.py + wahlprogramm-links.yaml: SHA-Gate Auto-DL (#138)
- auswertungen.py: BL-Filter + get_wahlperioden Helper (#137)
- auth.py: Direct-Access-Grant + Refresh-Token-Cookie
Classic-Updates:
- Header-DRY via _header.html, Auswertungen redirected, Batch-Inline raus
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 20:55:57 +02:00
|
|
|
logger.info("Found %d pending jobs from previous run", len(pending))
|
2026-04-10 23:32:40 +02:00
|
|
|
|
|
|
|
|
from .parlamente import get_adapter
|
|
|
|
|
|
|
|
|
|
re_enqueued = 0
|
feat(#139,#129,#138,#141): v2-Frontend (ECOnGOOD-CD), Login-Modal, Auto-DL, OG-Cards
v2-Frontend (#139, ECOnGOOD CD Manual Juni 2024):
- app/static/v2/: tokens.css, fonts.css, v2.css, Nunito-Sans woff2, Phosphor-Icons (21 SVGs)
- app/templates/v2/: base.html + 11 Screens + 8 Component-Macros
- AppShell mit Sidebar (Lesen/Pruefen/Daten/Admin), v2-Detail mit allen Features
(ScoreHero, MatrixMini, QuoteCard, Redline, Fraktions-Scores)
- v2 ist jetzt Default unter / — classic unter /classic
- Login-Modal in v2-Topbar mit Tabs Anmelden/Registrieren (#129)
- Phosphor-Icons in Sidebar + Topbar mit dynamischem Theme-Toggle
- Keyboard-Shortcuts (j/k/Enter/Esc/?/path), Landtag-Suche, Antrag-Historie,
Sort-Dropdown, Matrix-Feld-Info-Modal, Bookmarks/Comments/Voting/Share/Re-Analyze
Backend-Erweiterungen:
- main.py: ~30 neue Routes (/v2/*, /antrag/{ds}, /api/auth/{login,refresh,logout},
/api/me/merkliste/*, /api/admin/*, /v2/admin/*, OG-Cards, etc.)
- og_card.py + og_template: Open-Graph-Bilder via Playwright (#141)
- wahlprogramm_fetch.py + wahlprogramm-links.yaml: SHA-Gate Auto-DL (#138)
- auswertungen.py: BL-Filter + get_wahlperioden Helper (#137)
- auth.py: Direct-Access-Grant + Refresh-Token-Cookie
Classic-Updates:
- Header-DRY via _header.html, Auswertungen redirected, Batch-Inline raus
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 20:55:57 +02:00
|
|
|
marked_stale = 0
|
2026-04-10 23:32:40 +02:00
|
|
|
for row in pending:
|
|
|
|
|
job_id = row["id"]
|
|
|
|
|
bundesland = row["bundesland"] or "NRW"
|
feat(#139,#129,#138,#141): v2-Frontend (ECOnGOOD-CD), Login-Modal, Auto-DL, OG-Cards
v2-Frontend (#139, ECOnGOOD CD Manual Juni 2024):
- app/static/v2/: tokens.css, fonts.css, v2.css, Nunito-Sans woff2, Phosphor-Icons (21 SVGs)
- app/templates/v2/: base.html + 11 Screens + 8 Component-Macros
- AppShell mit Sidebar (Lesen/Pruefen/Daten/Admin), v2-Detail mit allen Features
(ScoreHero, MatrixMini, QuoteCard, Redline, Fraktions-Scores)
- v2 ist jetzt Default unter / — classic unter /classic
- Login-Modal in v2-Topbar mit Tabs Anmelden/Registrieren (#129)
- Phosphor-Icons in Sidebar + Topbar mit dynamischem Theme-Toggle
- Keyboard-Shortcuts (j/k/Enter/Esc/?/path), Landtag-Suche, Antrag-Historie,
Sort-Dropdown, Matrix-Feld-Info-Modal, Bookmarks/Comments/Voting/Share/Re-Analyze
Backend-Erweiterungen:
- main.py: ~30 neue Routes (/v2/*, /antrag/{ds}, /api/auth/{login,refresh,logout},
/api/me/merkliste/*, /api/admin/*, /v2/admin/*, OG-Cards, etc.)
- og_card.py + og_template: Open-Graph-Bilder via Playwright (#141)
- wahlprogramm_fetch.py + wahlprogramm-links.yaml: SHA-Gate Auto-DL (#138)
- auswertungen.py: BL-Filter + get_wahlperioden Helper (#137)
- auth.py: Direct-Access-Grant + Refresh-Token-Cookie
Classic-Updates:
- Header-DRY via _header.html, Auswertungen redirected, Batch-Inline raus
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 20:55:57 +02:00
|
|
|
drucksache = row["drucksache"]
|
|
|
|
|
model = row["model"] or "qwen-plus"
|
|
|
|
|
|
|
|
|
|
if not drucksache or not analysis_callback:
|
|
|
|
|
# Legacy-Job ohne Drucksache oder kein Callback → stale markieren
|
|
|
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
|
|
|
await db.execute(
|
|
|
|
|
"UPDATE jobs SET status='stale', updated_at=datetime('now') WHERE id=?",
|
|
|
|
|
(job_id,),
|
|
|
|
|
)
|
|
|
|
|
await db.commit()
|
|
|
|
|
marked_stale += 1
|
|
|
|
|
continue
|
2026-04-10 23:32:40 +02:00
|
|
|
|
feat(#139,#129,#138,#141): v2-Frontend (ECOnGOOD-CD), Login-Modal, Auto-DL, OG-Cards
v2-Frontend (#139, ECOnGOOD CD Manual Juni 2024):
- app/static/v2/: tokens.css, fonts.css, v2.css, Nunito-Sans woff2, Phosphor-Icons (21 SVGs)
- app/templates/v2/: base.html + 11 Screens + 8 Component-Macros
- AppShell mit Sidebar (Lesen/Pruefen/Daten/Admin), v2-Detail mit allen Features
(ScoreHero, MatrixMini, QuoteCard, Redline, Fraktions-Scores)
- v2 ist jetzt Default unter / — classic unter /classic
- Login-Modal in v2-Topbar mit Tabs Anmelden/Registrieren (#129)
- Phosphor-Icons in Sidebar + Topbar mit dynamischem Theme-Toggle
- Keyboard-Shortcuts (j/k/Enter/Esc/?/path), Landtag-Suche, Antrag-Historie,
Sort-Dropdown, Matrix-Feld-Info-Modal, Bookmarks/Comments/Voting/Share/Re-Analyze
Backend-Erweiterungen:
- main.py: ~30 neue Routes (/v2/*, /antrag/{ds}, /api/auth/{login,refresh,logout},
/api/me/merkliste/*, /api/admin/*, /v2/admin/*, OG-Cards, etc.)
- og_card.py + og_template: Open-Graph-Bilder via Playwright (#141)
- wahlprogramm_fetch.py + wahlprogramm-links.yaml: SHA-Gate Auto-DL (#138)
- auswertungen.py: BL-Filter + get_wahlperioden Helper (#137)
- auth.py: Direct-Access-Grant + Refresh-Token-Cookie
Classic-Updates:
- Header-DRY via _header.html, Auswertungen redirected, Batch-Inline raus
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 20:55:57 +02:00
|
|
|
# Job mit Drucksache → neu enqueuen
|
|
|
|
|
try:
|
|
|
|
|
adapter = get_adapter(bundesland)
|
|
|
|
|
doc = await adapter.get_document(drucksache)
|
|
|
|
|
if not doc:
|
|
|
|
|
raise ValueError(f"Drucksache {drucksache} nicht gefunden")
|
|
|
|
|
text = await adapter.download_text(drucksache)
|
|
|
|
|
if not text:
|
|
|
|
|
raise ValueError(f"PDF-Text für {drucksache} leer")
|
|
|
|
|
|
|
|
|
|
position = await enqueue(
|
|
|
|
|
job_id,
|
|
|
|
|
analysis_callback,
|
|
|
|
|
job_id, drucksache, text, bundesland, model, doc,
|
|
|
|
|
drucksache=drucksache,
|
2026-04-10 17:24:34 +02:00
|
|
|
)
|
feat(#139,#129,#138,#141): v2-Frontend (ECOnGOOD-CD), Login-Modal, Auto-DL, OG-Cards
v2-Frontend (#139, ECOnGOOD CD Manual Juni 2024):
- app/static/v2/: tokens.css, fonts.css, v2.css, Nunito-Sans woff2, Phosphor-Icons (21 SVGs)
- app/templates/v2/: base.html + 11 Screens + 8 Component-Macros
- AppShell mit Sidebar (Lesen/Pruefen/Daten/Admin), v2-Detail mit allen Features
(ScoreHero, MatrixMini, QuoteCard, Redline, Fraktions-Scores)
- v2 ist jetzt Default unter / — classic unter /classic
- Login-Modal in v2-Topbar mit Tabs Anmelden/Registrieren (#129)
- Phosphor-Icons in Sidebar + Topbar mit dynamischem Theme-Toggle
- Keyboard-Shortcuts (j/k/Enter/Esc/?/path), Landtag-Suche, Antrag-Historie,
Sort-Dropdown, Matrix-Feld-Info-Modal, Bookmarks/Comments/Voting/Share/Re-Analyze
Backend-Erweiterungen:
- main.py: ~30 neue Routes (/v2/*, /antrag/{ds}, /api/auth/{login,refresh,logout},
/api/me/merkliste/*, /api/admin/*, /v2/admin/*, OG-Cards, etc.)
- og_card.py + og_template: Open-Graph-Bilder via Playwright (#141)
- wahlprogramm_fetch.py + wahlprogramm-links.yaml: SHA-Gate Auto-DL (#138)
- auswertungen.py: BL-Filter + get_wahlperioden Helper (#137)
- auth.py: Direct-Access-Grant + Refresh-Token-Cookie
Classic-Updates:
- Header-DRY via _header.html, Auswertungen redirected, Batch-Inline raus
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 20:55:57 +02:00
|
|
|
re_enqueued += 1
|
|
|
|
|
logger.info("Re-enqueued %s (%s) at position %d", drucksache, bundesland, position)
|
2026-04-10 23:32:40 +02:00
|
|
|
|
feat(#139,#129,#138,#141): v2-Frontend (ECOnGOOD-CD), Login-Modal, Auto-DL, OG-Cards
v2-Frontend (#139, ECOnGOOD CD Manual Juni 2024):
- app/static/v2/: tokens.css, fonts.css, v2.css, Nunito-Sans woff2, Phosphor-Icons (21 SVGs)
- app/templates/v2/: base.html + 11 Screens + 8 Component-Macros
- AppShell mit Sidebar (Lesen/Pruefen/Daten/Admin), v2-Detail mit allen Features
(ScoreHero, MatrixMini, QuoteCard, Redline, Fraktions-Scores)
- v2 ist jetzt Default unter / — classic unter /classic
- Login-Modal in v2-Topbar mit Tabs Anmelden/Registrieren (#129)
- Phosphor-Icons in Sidebar + Topbar mit dynamischem Theme-Toggle
- Keyboard-Shortcuts (j/k/Enter/Esc/?/path), Landtag-Suche, Antrag-Historie,
Sort-Dropdown, Matrix-Feld-Info-Modal, Bookmarks/Comments/Voting/Share/Re-Analyze
Backend-Erweiterungen:
- main.py: ~30 neue Routes (/v2/*, /antrag/{ds}, /api/auth/{login,refresh,logout},
/api/me/merkliste/*, /api/admin/*, /v2/admin/*, OG-Cards, etc.)
- og_card.py + og_template: Open-Graph-Bilder via Playwright (#141)
- wahlprogramm_fetch.py + wahlprogramm-links.yaml: SHA-Gate Auto-DL (#138)
- auswertungen.py: BL-Filter + get_wahlperioden Helper (#137)
- auth.py: Direct-Access-Grant + Refresh-Token-Cookie
Classic-Updates:
- Header-DRY via _header.html, Auswertungen redirected, Batch-Inline raus
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 20:55:57 +02:00
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning("Could not re-enqueue %s (%s): %s — marking stale", drucksache, bundesland, e)
|
|
|
|
|
async with aiosqlite.connect(settings.db_path) as db:
|
|
|
|
|
await db.execute(
|
|
|
|
|
"UPDATE jobs SET status='stale', error=?, updated_at=datetime('now') WHERE id=?",
|
|
|
|
|
(str(e)[:200], job_id),
|
|
|
|
|
)
|
|
|
|
|
await db.commit()
|
|
|
|
|
marked_stale += 1
|
|
|
|
|
|
|
|
|
|
logger.info("Re-enqueued %d jobs, marked %d stale", re_enqueued, marked_stale)
|