gwoe-antragspruefer/tests/test_auth.py

469 lines
18 KiB
Python
Raw Normal View History

"""Tests for app/auth.py — Keycloak JWT authentication (#43, #129).
These tests cover the auth module WITHOUT a running Keycloak server:
- Token extraction from headers/cookies
- Auth-disabled detection (Dev-Modus)
- direct_login Keycloak Direct Access Grant (gemockt via httpx)
- _pick_best_title helper (in main.py, tested here for convenience)
"""
import sys
import types
# Stub jose if not installed locally
if "jose" not in sys.modules:
jose_stub = types.ModuleType("jose")
jose_jwt = types.ModuleType("jose.jwt")
jose_stub.jwt = jose_jwt
jose_stub.JWTError = Exception
jose_stub.ExpiredSignatureError = Exception
jose_jwt.get_unverified_header = lambda t: {"kid": "test"}
jose_jwt.decode = lambda *a, **kw: {"sub": "test", "email": "t@t.de"}
sys.modules["jose"] = jose_stub
sys.modules["jose.jwt"] = jose_jwt
import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
from fastapi import HTTPException
from app.auth import _extract_token, _is_auth_enabled
class TestExtractToken:
def test_bearer_header(self):
req = MagicMock()
req.headers = {"authorization": "Bearer abc123"}
req.cookies = {}
assert _extract_token(req) == "abc123"
def test_cookie_fallback(self):
req = MagicMock()
req.headers = {}
req.cookies = {"access_token": "cookie_token"}
assert _extract_token(req) == "cookie_token"
def test_no_token(self):
req = MagicMock()
req.headers = {}
req.cookies = {}
assert _extract_token(req) is None
def test_non_bearer_header_ignored(self):
req = MagicMock()
req.headers = {"authorization": "Basic dXNlcjpwYXNz"}
req.cookies = {}
assert _extract_token(req) is None
class TestIsAuthEnabled:
def test_disabled_when_url_empty(self, monkeypatch):
from app import config
monkeypatch.setattr(config.settings, "keycloak_url", "")
monkeypatch.setattr(config.settings, "keycloak_realm", "test")
monkeypatch.setattr(config.settings, "keycloak_client_id", "test")
assert _is_auth_enabled() is False
def test_disabled_when_realm_empty(self, monkeypatch):
from app import config
monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test")
monkeypatch.setattr(config.settings, "keycloak_realm", "")
monkeypatch.setattr(config.settings, "keycloak_client_id", "test")
assert _is_auth_enabled() is False
def test_enabled_when_all_set(self, monkeypatch):
from app import config
monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test")
monkeypatch.setattr(config.settings, "keycloak_realm", "realm")
monkeypatch.setattr(config.settings, "keycloak_client_id", "client")
assert _is_auth_enabled() is True
class TestDirectLogin:
"""Tests für direct_login() in auth.py — Keycloak Direct Access Grant (#129).
Alle Keycloak-HTTP-Calls werden via unittest.mock.patch gemockt.
Kein laufender Keycloak-Server nötig.
"""
def _run(self, coro):
return asyncio.get_event_loop().run_until_complete(coro)
def _make_resp(self, status_code: int, body: dict):
resp = MagicMock()
resp.status_code = status_code
resp.json.return_value = body
return resp
def test_success_returns_token_data(self, monkeypatch):
"""Bei 200 von Keycloak gibt direct_login das Token-Dict zurück."""
from app import config
monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test")
monkeypatch.setattr(config.settings, "keycloak_realm", "testrealm")
monkeypatch.setattr(config.settings, "keycloak_client_id", "testclient")
token_response = {
"access_token": "eyABC",
"refresh_token": "ryDEF",
"expires_in": 300,
"refresh_expires_in": 1800,
}
mock_resp = self._make_resp(200, token_response)
async def _mock_post(*args, **kwargs):
return mock_resp
mock_client = AsyncMock()
mock_client.post = _mock_post
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("app.auth.httpx.AsyncClient", return_value=mock_client):
from app.auth import direct_login
result = self._run(direct_login("user", "pw"))
assert result["access_token"] == "eyABC"
assert result["refresh_token"] == "ryDEF"
assert result["expires_in"] == 300
def test_invalid_credentials_raises_401(self, monkeypatch):
"""Bei 401 von Keycloak wirft direct_login HTTPException(status_code=401)."""
from app import config
monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test")
monkeypatch.setattr(config.settings, "keycloak_realm", "testrealm")
monkeypatch.setattr(config.settings, "keycloak_client_id", "testclient")
mock_resp = self._make_resp(401, {"error": "invalid_grant", "error_description": "Ungültige Anmeldedaten"})
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_resp)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("app.auth.httpx.AsyncClient", return_value=mock_client):
from app.auth import direct_login
with pytest.raises(HTTPException) as exc_info:
self._run(direct_login("user", "falsch"))
assert exc_info.value.status_code == 401
assert "Ungültige Anmeldedaten" in exc_info.value.detail
def test_keycloak_error_raises_non_401(self, monkeypatch):
"""Bei 500 von Keycloak wirft direct_login HTTPException mit dem Keycloak-Statuscode."""
from app import config
monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test")
monkeypatch.setattr(config.settings, "keycloak_realm", "testrealm")
monkeypatch.setattr(config.settings, "keycloak_client_id", "testclient")
mock_resp = self._make_resp(500, {"error_description": "Internal Server Error"})
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_resp)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("app.auth.httpx.AsyncClient", return_value=mock_client):
from app.auth import direct_login
with pytest.raises(HTTPException) as exc_info:
self._run(direct_login("user", "pw"))
assert exc_info.value.status_code == 500
def test_auth_disabled_raises_400(self, monkeypatch):
"""Wenn Auth nicht konfiguriert ist, wirft direct_login HTTPException(400)."""
from app import config
monkeypatch.setattr(config.settings, "keycloak_url", "")
monkeypatch.setattr(config.settings, "keycloak_realm", "")
monkeypatch.setattr(config.settings, "keycloak_client_id", "")
from app.auth import direct_login
with pytest.raises(HTTPException) as exc_info:
self._run(direct_login("user", "pw"))
assert exc_info.value.status_code == 400
try:
from app.main import _pick_best_title
_HAS_MAIN = True
except ImportError:
_HAS_MAIN = False
@pytest.mark.skipif(not _HAS_MAIN, reason="app.main not importable (missing slowapi/etc)")
class TestPickBestTitle:
"""Test _pick_best_title from main.py."""
def test_prefer_real_doc_title(self):
assert _pick_best_title(
"LLM-Titel", "Echte Antrag-Bezeichnung aus OPAL", "18/123"
) == "Echte Antrag-Bezeichnung aus OPAL"
def test_reject_generic_doc_title(self):
from app.main import _pick_best_title
result = _pick_best_title(
"Lehrkräfte stärken", "Drucksache 18/18085", "18/18085"
)
assert result == "Lehrkräfte stärken"
def test_fallback_to_llm_title(self):
assert _pick_best_title("LLM-Titel", None, "18/123") == "LLM-Titel"
def test_fallback_to_generic(self):
assert _pick_best_title("", None, "18/123") == "Drucksache 18/123"
def test_empty_doc_title_uses_llm(self):
assert _pick_best_title("Guter LLM-Titel", "", "18/123") == "Guter LLM-Titel"
# ─── Coverage-Backfill (#134) ────────────────────────────────────────────────
class TestKeycloakUrls:
def test_issuer_includes_realm(self, monkeypatch):
from app import auth
from app.config import settings
monkeypatch.setattr(settings, "keycloak_url", "https://sso.example")
monkeypatch.setattr(settings, "keycloak_realm", "myrealm")
assert auth._keycloak_issuer() == "https://sso.example/realms/myrealm"
def test_jwks_url_appends_certs(self, monkeypatch):
from app import auth
from app.config import settings
monkeypatch.setattr(settings, "keycloak_url", "https://sso.example")
monkeypatch.setattr(settings, "keycloak_realm", "myrealm")
assert auth._keycloak_jwks_url() == (
"https://sso.example/realms/myrealm/protocol/openid-connect/certs"
)
class TestGetJwks:
"""JWKS-Cache-Verhalten + HTTP-Fehler-Pfad."""
@pytest.mark.asyncio
async def test_returns_cached_when_fresh(self, monkeypatch):
from app import auth
import time as _time
# Stelle sicher: Cache ist gesetzt + nicht abgelaufen
monkeypatch.setattr(auth, "_jwks_cache", {"keys": [{"kid": "abc"}]})
monkeypatch.setattr(auth, "_jwks_cache_time", _time.time())
result = await auth._get_jwks()
assert result == {"keys": [{"kid": "abc"}]}
@pytest.mark.asyncio
async def test_fetches_when_cache_empty(self, monkeypatch):
from app import auth
import httpx as _httpx
from unittest.mock import AsyncMock, MagicMock, patch
monkeypatch.setattr(auth, "_jwks_cache", {})
monkeypatch.setattr(auth, "_jwks_cache_time", 0)
fake_resp = MagicMock(status_code=200, json=lambda: {"keys": [{"kid": "new"}]})
async def fake_get(self, url):
return fake_resp
with patch.object(_httpx.AsyncClient, "get", fake_get):
result = await auth._get_jwks()
assert "keys" in result
assert result["keys"][0]["kid"] == "new"
@pytest.mark.asyncio
async def test_http_error_returns_stale_cache(self, monkeypatch):
from app import auth
import httpx as _httpx
from unittest.mock import patch
# Stale cache vorhanden
monkeypatch.setattr(auth, "_jwks_cache", {"keys": [{"kid": "old"}]})
monkeypatch.setattr(auth, "_jwks_cache_time", 0) # abgelaufen
async def failing_get(self, url):
raise _httpx.ConnectError("network down")
with patch.object(_httpx.AsyncClient, "get", failing_get):
result = await auth._get_jwks()
# Stale-Cache wird zurueckgegeben
assert result == {"keys": [{"kid": "old"}]}
class TestValidateToken:
"""_validate_token: Schlüssel-Lookup, Payload-Mapping."""
@pytest.mark.asyncio
async def test_no_jwks_returns_none(self, monkeypatch):
from app import auth
async def fake_jwks():
return {}
monkeypatch.setattr(auth, "_get_jwks", fake_jwks)
result = await auth._validate_token("any-token")
assert result is None
class TestGetCurrentUser:
@pytest.mark.asyncio
async def test_returns_none_when_auth_disabled(self, monkeypatch):
from app import auth
from unittest.mock import MagicMock
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: False)
request = MagicMock(headers={}, cookies={})
assert await auth.get_current_user(request) is None
@pytest.mark.asyncio
async def test_returns_none_when_no_token(self, monkeypatch):
from app import auth
from unittest.mock import MagicMock
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
request = MagicMock()
request.headers.get = lambda k, d="": ""
request.cookies.get = lambda k: None
assert await auth.get_current_user(request) is None
class TestRequireAuth:
@pytest.mark.asyncio
async def test_dev_mode_returns_anonymous(self, monkeypatch):
from app import auth
from unittest.mock import MagicMock
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: False)
request = MagicMock()
user = await auth.require_auth(request)
assert user["sub"] == "anonymous"
assert "Dev-Modus" in user["name"]
@pytest.mark.asyncio
async def test_no_token_raises_401(self, monkeypatch):
from fastapi import HTTPException
from app import auth
from unittest.mock import MagicMock
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
request = MagicMock()
request.headers.get = lambda k, d="": ""
request.cookies.get = lambda k: None
with pytest.raises(HTTPException) as exc:
await auth.require_auth(request)
assert exc.value.status_code == 401
@pytest.mark.asyncio
async def test_invalid_token_raises_401(self, monkeypatch):
from fastapi import HTTPException
from app import auth
from unittest.mock import MagicMock
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
async def fake_validate(t): return None
monkeypatch.setattr(auth, "_validate_token", fake_validate)
request = MagicMock()
request.headers.get = lambda k, d="": "Bearer bad-token"
request.cookies.get = lambda k: None
with pytest.raises(HTTPException) as exc:
await auth.require_auth(request)
assert exc.value.status_code == 401
@pytest.mark.asyncio
async def test_valid_token_returns_user(self, monkeypatch):
from app import auth
from unittest.mock import MagicMock
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
async def fake_validate(t): return {"sub": "u1", "email": "a@b", "name": "X", "roles": []}
monkeypatch.setattr(auth, "_validate_token", fake_validate)
request = MagicMock()
request.headers.get = lambda k, d="": "Bearer ok-token"
user = await auth.require_auth(request)
assert user["sub"] == "u1"
class TestRequireAdmin:
@pytest.mark.asyncio
async def test_dev_mode_returns_anonymous_admin(self, monkeypatch):
from app import auth
from unittest.mock import MagicMock
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: False)
user = await auth.require_admin(MagicMock())
assert "admin" in user["roles"]
@pytest.mark.asyncio
async def test_admin_role_passes(self, monkeypatch):
from app import auth
from unittest.mock import MagicMock
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
async def fake_require_auth(req): return {"sub": "u1", "roles": ["admin"]}
monkeypatch.setattr(auth, "require_auth", fake_require_auth)
user = await auth.require_admin(MagicMock())
assert "admin" in user["roles"]
@pytest.mark.asyncio
async def test_gwoe_admin_role_passes(self, monkeypatch):
from app import auth
from unittest.mock import MagicMock
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
async def fake_require_auth(req): return {"sub": "u1", "roles": ["gwoe-admin"]}
monkeypatch.setattr(auth, "require_auth", fake_require_auth)
user = await auth.require_admin(MagicMock())
assert "gwoe-admin" in user["roles"]
@pytest.mark.asyncio
async def test_no_admin_role_raises_403(self, monkeypatch):
from fastapi import HTTPException
from app import auth
from unittest.mock import MagicMock
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
async def fake_require_auth(req): return {"sub": "u1", "roles": ["user"]}
monkeypatch.setattr(auth, "require_auth", fake_require_auth)
with pytest.raises(HTTPException) as exc:
await auth.require_admin(MagicMock())
assert exc.value.status_code == 403
class TestKeycloakAdminToken:
@pytest.mark.asyncio
async def test_no_credentials_raises(self, monkeypatch):
from fastapi import HTTPException
from app import auth
from app.config import settings
monkeypatch.setattr(settings, "keycloak_admin_user", "")
with pytest.raises(HTTPException) as exc:
await auth.keycloak_admin_token()
assert exc.value.status_code == 500
@pytest.mark.asyncio
async def test_returns_access_token_on_success(self, monkeypatch):
from app import auth
from app.config import settings
from unittest.mock import MagicMock, patch
import httpx as _httpx
monkeypatch.setattr(settings, "keycloak_admin_user", "admin")
monkeypatch.setattr(settings, "keycloak_admin_password", "secret")
monkeypatch.setattr(settings, "keycloak_url", "https://sso.example")
fake_resp = MagicMock(status_code=200,
json=lambda: {"access_token": "TOKEN-123"})
async def fake_post(self, url, data=None, **kw):
return fake_resp
with patch.object(_httpx.AsyncClient, "post", fake_post):
tok = await auth.keycloak_admin_token()
assert tok == "TOKEN-123"
@pytest.mark.asyncio
async def test_keycloak_error_raises_500(self, monkeypatch):
from fastapi import HTTPException
from app import auth
from app.config import settings
from unittest.mock import MagicMock, patch
import httpx as _httpx
monkeypatch.setattr(settings, "keycloak_admin_user", "admin")
monkeypatch.setattr(settings, "keycloak_admin_password", "secret")
fake_resp = MagicMock(status_code=500, text="server error")
async def fake_post(self, url, data=None, **kw):
return fake_resp
with patch.object(_httpx.AsyncClient, "post", fake_post):
with pytest.raises(HTTPException) as exc:
await auth.keycloak_admin_token()
assert exc.value.status_code == 500