gwoe-antragspruefer/app/database.py
Dotty Dotter 9c70b463ac Phase A: Audit-Restbefunde #57.3/4/7 (Roadmap #59)
Drei verbleibende Audit-Befunde aus #57 in einem Patch:

- **#57.3 MEDIUM** Drucksache-Regex-Validation: neue
  app/validators.py mit validate_drucksache() als gemeinsamer
  Validation-Funnel. Pattern ^\d{1,3}/\d{1,7}([-(].{1,20})?$ deckt
  alle 10 aktiven Bundesländer (8/6390, 18/12345, 8/6390(neu),
  23/3700-A) ab und blockt Path-Traversal (../, /etc/passwd) plus
  Standard-Injection (;, <, &). Drei Endpoints durchgeschleust:
  /api/assessment, /api/assessment/pdf, /api/analyze-drucksache.

- **#57.4 MEDIUM** print() → logging.getLogger(__name__): main.py
  und analyzer.py auf strukturiertes Logging umgestellt. LLM-Inhalte
  werden NICHT mehr als Volltext geloggt — neue Helper
  _content_fingerprint() liefert nur "len=N sha1=XXXX", reicht zur
  Forensik ohne Antrag-Inhalte ins Container-Log zu leaken.
  basicConfig() mit ISO-Format setzt strukturiertes Logging früh,
  damit logger.exception() auch beim Boot greift.

- **#57.7 LOW-MED** Search-Query-Limit: validate_search_query() mit
  MAX_SEARCH_QUERY_LEN=200 schützt /api/search und /api/search-landtag
  vor 10-MB-Query-DoS. database._parse_search_query() loggt jetzt
  shlex.ValueError-Fallback statt ihn zu verschlucken (deckt Memory-
  Regel "stille excepts in Adaptern" ab).

Tests: neue tests/test_main_validators.py mit 22 Cases — Drucksache-
Whitelist-Roundtrip + Path-Traversal-Reject, Search-Query Längen-
Edge-Cases. 107 Unit-Tests grün (85 alt + 22 neu).

Validators in eigenem Modul (app/validators.py), damit Tests sie ohne
slowapi-Dependency direkt importieren können.

Refs: #57, #59 (Phase A)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 11:15:16 +02:00

349 lines
12 KiB
Python

"""SQLite database for job tracking."""
import aiosqlite
from datetime import datetime
from typing import Optional
from .config import settings
async def init_db():
"""Initialize database with tables."""
async with aiosqlite.connect(settings.db_path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'queued',
input_preview TEXT,
bundesland TEXT DEFAULT 'NRW',
model TEXT DEFAULT 'qwen-plus',
result TEXT,
html_path TEXT,
pdf_path TEXT,
error TEXT,
user_id TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# Assessments table for pre-computed + new analyses
await db.execute("""
CREATE TABLE IF NOT EXISTS assessments (
drucksache TEXT PRIMARY KEY,
title TEXT,
fraktionen TEXT, -- JSON array
datum TEXT,
link TEXT,
bundesland TEXT DEFAULT 'NRW',
gwoe_score REAL,
gwoe_begruendung TEXT,
gwoe_matrix TEXT, -- JSON array
gwoe_schwerpunkt TEXT, -- JSON array
wahlprogramm_scores TEXT, -- JSON array
verbesserungen TEXT, -- JSON array
staerken TEXT, -- JSON array
schwaechen TEXT, -- JSON array
empfehlung TEXT,
empfehlung_symbol TEXT,
verbesserungspotenzial TEXT,
themen TEXT, -- JSON array
antrag_zusammenfassung TEXT,
antrag_kernpunkte TEXT, -- JSON array
source TEXT DEFAULT 'batch', -- 'batch' or 'webapp'
model TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
await db.commit()
async def create_job(
job_id: str,
input_preview: str,
bundesland: str = "NRW",
model: str = "qwen-plus",
user_id: Optional[str] = None,
) -> dict:
"""Create a new analysis job."""
now = datetime.utcnow().isoformat()
async with aiosqlite.connect(settings.db_path) as db:
await db.execute(
"""
INSERT INTO jobs (id, input_preview, bundesland, model, user_id, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(job_id, input_preview, bundesland, model, user_id, now, now),
)
await db.commit()
return {"id": job_id, "status": "queued", "created_at": now}
async def get_job(job_id: str) -> Optional[dict]:
"""Get job by ID."""
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,))
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def update_job(job_id: str, **kwargs) -> bool:
"""Update job fields."""
if not kwargs:
return False
kwargs["updated_at"] = datetime.utcnow().isoformat()
fields = ", ".join(f"{k} = ?" for k in kwargs.keys())
values = list(kwargs.values()) + [job_id]
async with aiosqlite.connect(settings.db_path) as db:
await db.execute(f"UPDATE jobs SET {fields} WHERE id = ?", values)
await db.commit()
return True
async def get_user_jobs(user_id: str, limit: int = 50) -> list[dict]:
"""Get jobs for a user (for history page)."""
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC LIMIT ?",
(user_id, limit),
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def upsert_assessment(data: dict) -> bool:
"""Insert or update an assessment."""
import json
now = datetime.utcnow().isoformat()
async with aiosqlite.connect(settings.db_path) as db:
await db.execute("""
INSERT INTO assessments (
drucksache, title, fraktionen, datum, link, bundesland,
gwoe_score, gwoe_begruendung, gwoe_matrix, gwoe_schwerpunkt,
wahlprogramm_scores, verbesserungen, staerken, schwaechen,
empfehlung, empfehlung_symbol, verbesserungspotenzial,
themen, antrag_zusammenfassung, antrag_kernpunkte,
source, model, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(drucksache) DO UPDATE SET
title = excluded.title,
gwoe_score = excluded.gwoe_score,
gwoe_begruendung = excluded.gwoe_begruendung,
gwoe_matrix = excluded.gwoe_matrix,
updated_at = excluded.updated_at
""", (
data.get("drucksache"),
data.get("title"),
json.dumps(data.get("fraktionen", [])),
data.get("datum"),
data.get("link"),
data.get("bundesland", "NRW"),
data.get("gwoeScore"),
data.get("gwoeBegründung"),
json.dumps(data.get("gwoeMatrix", [])),
json.dumps(data.get("gwoeSchwerpunkt", [])),
json.dumps(data.get("wahlprogrammScores", [])),
json.dumps(data.get("verbesserungen", [])),
json.dumps(data.get("stärken", [])),
json.dumps(data.get("schwächen", [])),
data.get("empfehlung"),
data.get("empfehlungSymbol"),
data.get("verbesserungspotenzial"),
json.dumps(data.get("themen", [])),
data.get("antragZusammenfassung"),
json.dumps(data.get("antragKernpunkte", [])),
data.get("source", "webapp"),
data.get("model"),
now, now
))
await db.commit()
return True
async def get_assessment(drucksache: str) -> Optional[dict]:
"""Get assessment by drucksache ID."""
import json
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM assessments WHERE drucksache = ?", (drucksache,)
)
row = await cursor.fetchone()
if row:
d = dict(row)
# Parse JSON fields
for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt",
"wahlprogramm_scores", "verbesserungen", "staerken",
"schwaechen", "themen", "antrag_kernpunkte"]:
if d.get(field):
try:
d[field] = json.loads(d[field])
except:
pass
return d
return None
async def get_all_assessments(bundesland: str = None) -> list[dict]:
"""Get all assessments from database, optionally filtered by Bundesland.
The special value ``"ALL"`` and ``None`` mean no filter — both behave
identically and return every row. Any other value becomes a strict
``WHERE bundesland = ?`` match.
"""
import json
sql = "SELECT * FROM assessments"
params: list = []
if bundesland and bundesland != "ALL":
sql += " WHERE bundesland = ?"
params.append(bundesland)
sql += " ORDER BY gwoe_score DESC"
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(sql, params)
rows = await cursor.fetchall()
results = []
for row in rows:
d = dict(row)
# Parse JSON fields
for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt",
"wahlprogramm_scores", "verbesserungen", "staerken",
"schwaechen", "themen", "antrag_kernpunkte"]:
if d.get(field):
try:
d[field] = json.loads(d[field])
except:
pass
results.append(d)
return results
async def import_json_assessments(assessments_dir):
"""Import assessments from JSON files into database."""
import json
from pathlib import Path
dir_path = Path(assessments_dir)
if not dir_path.exists():
return 0
count = 0
for f in dir_path.glob("*.json"):
try:
data = json.loads(f.read_text())
data["source"] = "batch"
await upsert_assessment(data)
count += 1
except Exception as e:
print(f"Error importing {f}: {e}")
return count
def _parse_search_query(query: str) -> tuple[list[str], bool]:
"""
Parse search query for AND logic and exact phrases.
Returns: (terms, is_exact)
Examples:
- 'Klimaschutz Energie' -> (['klimaschutz', 'energie'], False)
- '"Grüner Stahl"' -> (['grüner stahl'], True)
"""
query = query.strip()
# Check for exact phrase (entire query in quotes)
if query.startswith('"') and query.endswith('"') and query.count('"') == 2:
exact = query[1:-1].strip()
return ([exact.lower()], True)
# Extract quoted phrases and regular terms
import logging
import shlex
try:
parts = shlex.split(query)
except ValueError as e:
# Unbalanced quote — fall back to whitespace split, but log so we
# notice patterns of malformed queries (Issue #57 Befund #7).
logging.getLogger(__name__).warning(
"shlex.split failed on search query (%s), falling back to whitespace split", e
)
parts = query.split()
return ([p.lower() for p in parts], False)
async def search_assessments(query: str, bundesland: str = None, limit: int = 50) -> list[dict]:
"""Search assessments by title, drucksache, or themen. Supports AND logic."""
import json
terms, is_exact = _parse_search_query(query)
# Build SQL for first term (to narrow down results)
first_term = terms[0] if terms else query.lower()
sql = """
SELECT * FROM assessments
WHERE (
LOWER(drucksache) LIKE ?
OR LOWER(title) LIKE ?
OR LOWER(themen) LIKE ?
OR LOWER(fraktionen) LIKE ?
)
"""
params = [f"%{first_term}%"] * 4
if bundesland and bundesland != "ALL":
sql += " AND bundesland = ?"
params.append(bundesland)
sql += " ORDER BY gwoe_score DESC LIMIT ?"
params.append(limit)
async with aiosqlite.connect(settings.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(sql, params)
rows = await cursor.fetchall()
results = []
for row in rows:
d = dict(row)
for field in ["fraktionen", "gwoe_matrix", "gwoe_schwerpunkt",
"wahlprogramm_scores", "verbesserungen", "staerken",
"schwaechen", "themen", "antrag_kernpunkte"]:
if d.get(field):
try:
d[field] = json.loads(d[field])
except:
pass
# Apply AND filter for multiple terms
if len(terms) > 1 or is_exact:
searchable = f"{d.get('title', '')} {d.get('drucksache', '')} {' '.join(d.get('fraktionen', []))} {' '.join(d.get('themen', []))}".lower()
if is_exact:
if terms[0] not in searchable:
continue
else:
if not all(term in searchable for term in terms):
continue
results.append(d)
return results