"""Tests for app/auth.py — Keycloak JWT authentication (#43, #129). These tests cover the auth module WITHOUT a running Keycloak server: - Token extraction from headers/cookies - Auth-disabled detection (Dev-Modus) - direct_login — Keycloak Direct Access Grant (gemockt via httpx) - _pick_best_title helper (in main.py, tested here for convenience) """ import sys import types # Stub jose if not installed locally if "jose" not in sys.modules: jose_stub = types.ModuleType("jose") jose_jwt = types.ModuleType("jose.jwt") jose_stub.jwt = jose_jwt jose_stub.JWTError = Exception jose_stub.ExpiredSignatureError = Exception jose_jwt.get_unverified_header = lambda t: {"kid": "test"} jose_jwt.decode = lambda *a, **kw: {"sub": "test", "email": "t@t.de"} sys.modules["jose"] = jose_stub sys.modules["jose.jwt"] = jose_jwt import pytest import asyncio from unittest.mock import AsyncMock, MagicMock, patch from fastapi import HTTPException from app.auth import _extract_token, _is_auth_enabled class TestExtractToken: def test_bearer_header(self): req = MagicMock() req.headers = {"authorization": "Bearer abc123"} req.cookies = {} assert _extract_token(req) == "abc123" def test_cookie_fallback(self): req = MagicMock() req.headers = {} req.cookies = {"access_token": "cookie_token"} assert _extract_token(req) == "cookie_token" def test_no_token(self): req = MagicMock() req.headers = {} req.cookies = {} assert _extract_token(req) is None def test_non_bearer_header_ignored(self): req = MagicMock() req.headers = {"authorization": "Basic dXNlcjpwYXNz"} req.cookies = {} assert _extract_token(req) is None class TestIsAuthEnabled: def test_disabled_when_url_empty(self, monkeypatch): from app import config monkeypatch.setattr(config.settings, "keycloak_url", "") monkeypatch.setattr(config.settings, "keycloak_realm", "test") monkeypatch.setattr(config.settings, "keycloak_client_id", "test") assert _is_auth_enabled() is False def test_disabled_when_realm_empty(self, monkeypatch): from app import config monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test") monkeypatch.setattr(config.settings, "keycloak_realm", "") monkeypatch.setattr(config.settings, "keycloak_client_id", "test") assert _is_auth_enabled() is False def test_enabled_when_all_set(self, monkeypatch): from app import config monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test") monkeypatch.setattr(config.settings, "keycloak_realm", "realm") monkeypatch.setattr(config.settings, "keycloak_client_id", "client") assert _is_auth_enabled() is True class TestDirectLogin: """Tests für direct_login() in auth.py — Keycloak Direct Access Grant (#129). Alle Keycloak-HTTP-Calls werden via unittest.mock.patch gemockt. Kein laufender Keycloak-Server nötig. """ def _run(self, coro): return asyncio.get_event_loop().run_until_complete(coro) def _make_resp(self, status_code: int, body: dict): resp = MagicMock() resp.status_code = status_code resp.json.return_value = body return resp def test_success_returns_token_data(self, monkeypatch): """Bei 200 von Keycloak gibt direct_login das Token-Dict zurück.""" from app import config monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test") monkeypatch.setattr(config.settings, "keycloak_realm", "testrealm") monkeypatch.setattr(config.settings, "keycloak_client_id", "testclient") token_response = { "access_token": "eyABC", "refresh_token": "ryDEF", "expires_in": 300, "refresh_expires_in": 1800, } mock_resp = self._make_resp(200, token_response) async def _mock_post(*args, **kwargs): return mock_resp mock_client = AsyncMock() mock_client.post = _mock_post mock_client.__aenter__ = AsyncMock(return_value=mock_client) mock_client.__aexit__ = AsyncMock(return_value=False) with patch("app.auth.httpx.AsyncClient", return_value=mock_client): from app.auth import direct_login result = self._run(direct_login("user", "pw")) assert result["access_token"] == "eyABC" assert result["refresh_token"] == "ryDEF" assert result["expires_in"] == 300 def test_invalid_credentials_raises_401(self, monkeypatch): """Bei 401 von Keycloak wirft direct_login HTTPException(status_code=401).""" from app import config monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test") monkeypatch.setattr(config.settings, "keycloak_realm", "testrealm") monkeypatch.setattr(config.settings, "keycloak_client_id", "testclient") mock_resp = self._make_resp(401, {"error": "invalid_grant", "error_description": "Ungültige Anmeldedaten"}) mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_resp) mock_client.__aenter__ = AsyncMock(return_value=mock_client) mock_client.__aexit__ = AsyncMock(return_value=False) with patch("app.auth.httpx.AsyncClient", return_value=mock_client): from app.auth import direct_login with pytest.raises(HTTPException) as exc_info: self._run(direct_login("user", "falsch")) assert exc_info.value.status_code == 401 assert "Ungültige Anmeldedaten" in exc_info.value.detail def test_keycloak_error_raises_non_401(self, monkeypatch): """Bei 500 von Keycloak wirft direct_login HTTPException mit dem Keycloak-Statuscode.""" from app import config monkeypatch.setattr(config.settings, "keycloak_url", "https://sso.test") monkeypatch.setattr(config.settings, "keycloak_realm", "testrealm") monkeypatch.setattr(config.settings, "keycloak_client_id", "testclient") mock_resp = self._make_resp(500, {"error_description": "Internal Server Error"}) mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_resp) mock_client.__aenter__ = AsyncMock(return_value=mock_client) mock_client.__aexit__ = AsyncMock(return_value=False) with patch("app.auth.httpx.AsyncClient", return_value=mock_client): from app.auth import direct_login with pytest.raises(HTTPException) as exc_info: self._run(direct_login("user", "pw")) assert exc_info.value.status_code == 500 def test_auth_disabled_raises_400(self, monkeypatch): """Wenn Auth nicht konfiguriert ist, wirft direct_login HTTPException(400).""" from app import config monkeypatch.setattr(config.settings, "keycloak_url", "") monkeypatch.setattr(config.settings, "keycloak_realm", "") monkeypatch.setattr(config.settings, "keycloak_client_id", "") from app.auth import direct_login with pytest.raises(HTTPException) as exc_info: self._run(direct_login("user", "pw")) assert exc_info.value.status_code == 400 try: from app.main import _pick_best_title _HAS_MAIN = True except ImportError: _HAS_MAIN = False @pytest.mark.skipif(not _HAS_MAIN, reason="app.main not importable (missing slowapi/etc)") class TestPickBestTitle: """Test _pick_best_title from main.py.""" def test_prefer_real_doc_title(self): assert _pick_best_title( "LLM-Titel", "Echte Antrag-Bezeichnung aus OPAL", "18/123" ) == "Echte Antrag-Bezeichnung aus OPAL" def test_reject_generic_doc_title(self): from app.main import _pick_best_title result = _pick_best_title( "Lehrkräfte stärken", "Drucksache 18/18085", "18/18085" ) assert result == "Lehrkräfte stärken" def test_fallback_to_llm_title(self): assert _pick_best_title("LLM-Titel", None, "18/123") == "LLM-Titel" def test_fallback_to_generic(self): assert _pick_best_title("", None, "18/123") == "Drucksache 18/123" def test_empty_doc_title_uses_llm(self): assert _pick_best_title("Guter LLM-Titel", "", "18/123") == "Guter LLM-Titel" # ─── Coverage-Backfill (#134) ──────────────────────────────────────────────── class TestKeycloakUrls: def test_issuer_includes_realm(self, monkeypatch): from app import auth from app.config import settings monkeypatch.setattr(settings, "keycloak_url", "https://sso.example") monkeypatch.setattr(settings, "keycloak_realm", "myrealm") assert auth._keycloak_issuer() == "https://sso.example/realms/myrealm" def test_jwks_url_appends_certs(self, monkeypatch): from app import auth from app.config import settings monkeypatch.setattr(settings, "keycloak_url", "https://sso.example") monkeypatch.setattr(settings, "keycloak_realm", "myrealm") assert auth._keycloak_jwks_url() == ( "https://sso.example/realms/myrealm/protocol/openid-connect/certs" ) class TestGetJwks: """JWKS-Cache-Verhalten + HTTP-Fehler-Pfad.""" @pytest.mark.asyncio async def test_returns_cached_when_fresh(self, monkeypatch): from app import auth import time as _time # Stelle sicher: Cache ist gesetzt + nicht abgelaufen monkeypatch.setattr(auth, "_jwks_cache", {"keys": [{"kid": "abc"}]}) monkeypatch.setattr(auth, "_jwks_cache_time", _time.time()) result = await auth._get_jwks() assert result == {"keys": [{"kid": "abc"}]} @pytest.mark.asyncio async def test_fetches_when_cache_empty(self, monkeypatch): from app import auth import httpx as _httpx from unittest.mock import AsyncMock, MagicMock, patch monkeypatch.setattr(auth, "_jwks_cache", {}) monkeypatch.setattr(auth, "_jwks_cache_time", 0) fake_resp = MagicMock(status_code=200, json=lambda: {"keys": [{"kid": "new"}]}) async def fake_get(self, url): return fake_resp with patch.object(_httpx.AsyncClient, "get", fake_get): result = await auth._get_jwks() assert "keys" in result assert result["keys"][0]["kid"] == "new" @pytest.mark.asyncio async def test_http_error_returns_stale_cache(self, monkeypatch): from app import auth import httpx as _httpx from unittest.mock import patch # Stale cache vorhanden monkeypatch.setattr(auth, "_jwks_cache", {"keys": [{"kid": "old"}]}) monkeypatch.setattr(auth, "_jwks_cache_time", 0) # abgelaufen async def failing_get(self, url): raise _httpx.ConnectError("network down") with patch.object(_httpx.AsyncClient, "get", failing_get): result = await auth._get_jwks() # Stale-Cache wird zurueckgegeben assert result == {"keys": [{"kid": "old"}]} class TestValidateToken: """_validate_token: Schlüssel-Lookup, Payload-Mapping.""" @pytest.mark.asyncio async def test_no_jwks_returns_none(self, monkeypatch): from app import auth async def fake_jwks(): return {} monkeypatch.setattr(auth, "_get_jwks", fake_jwks) result = await auth._validate_token("any-token") assert result is None class TestGetCurrentUser: @pytest.mark.asyncio async def test_returns_none_when_auth_disabled(self, monkeypatch): from app import auth from unittest.mock import MagicMock monkeypatch.setattr(auth, "_is_auth_enabled", lambda: False) request = MagicMock(headers={}, cookies={}) assert await auth.get_current_user(request) is None @pytest.mark.asyncio async def test_returns_none_when_no_token(self, monkeypatch): from app import auth from unittest.mock import MagicMock monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True) request = MagicMock() request.headers.get = lambda k, d="": "" request.cookies.get = lambda k: None assert await auth.get_current_user(request) is None class TestRequireAuth: @pytest.mark.asyncio async def test_dev_mode_returns_anonymous(self, monkeypatch): from app import auth from unittest.mock import MagicMock monkeypatch.setattr(auth, "_is_auth_enabled", lambda: False) request = MagicMock() user = await auth.require_auth(request) assert user["sub"] == "anonymous" assert "Dev-Modus" in user["name"] @pytest.mark.asyncio async def test_no_token_raises_401(self, monkeypatch): from fastapi import HTTPException from app import auth from unittest.mock import MagicMock monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True) request = MagicMock() request.headers.get = lambda k, d="": "" request.cookies.get = lambda k: None with pytest.raises(HTTPException) as exc: await auth.require_auth(request) assert exc.value.status_code == 401 @pytest.mark.asyncio async def test_invalid_token_raises_401(self, monkeypatch): from fastapi import HTTPException from app import auth from unittest.mock import MagicMock monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True) async def fake_validate(t): return None monkeypatch.setattr(auth, "_validate_token", fake_validate) request = MagicMock() request.headers.get = lambda k, d="": "Bearer bad-token" request.cookies.get = lambda k: None with pytest.raises(HTTPException) as exc: await auth.require_auth(request) assert exc.value.status_code == 401 @pytest.mark.asyncio async def test_valid_token_returns_user(self, monkeypatch): from app import auth from unittest.mock import MagicMock monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True) async def fake_validate(t): return {"sub": "u1", "email": "a@b", "name": "X", "roles": []} monkeypatch.setattr(auth, "_validate_token", fake_validate) request = MagicMock() request.headers.get = lambda k, d="": "Bearer ok-token" user = await auth.require_auth(request) assert user["sub"] == "u1" class TestRequireAdmin: @pytest.mark.asyncio async def test_dev_mode_returns_anonymous_admin(self, monkeypatch): from app import auth from unittest.mock import MagicMock monkeypatch.setattr(auth, "_is_auth_enabled", lambda: False) user = await auth.require_admin(MagicMock()) assert "admin" in user["roles"] @pytest.mark.asyncio async def test_admin_role_passes(self, monkeypatch): from app import auth from unittest.mock import MagicMock monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True) async def fake_require_auth(req): return {"sub": "u1", "roles": ["admin"]} monkeypatch.setattr(auth, "require_auth", fake_require_auth) user = await auth.require_admin(MagicMock()) assert "admin" in user["roles"] @pytest.mark.asyncio async def test_gwoe_admin_role_passes(self, monkeypatch): from app import auth from unittest.mock import MagicMock monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True) async def fake_require_auth(req): return {"sub": "u1", "roles": ["gwoe-admin"]} monkeypatch.setattr(auth, "require_auth", fake_require_auth) user = await auth.require_admin(MagicMock()) assert "gwoe-admin" in user["roles"] @pytest.mark.asyncio async def test_no_admin_role_raises_403(self, monkeypatch): from fastapi import HTTPException from app import auth from unittest.mock import MagicMock monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True) async def fake_require_auth(req): return {"sub": "u1", "roles": ["user"]} monkeypatch.setattr(auth, "require_auth", fake_require_auth) with pytest.raises(HTTPException) as exc: await auth.require_admin(MagicMock()) assert exc.value.status_code == 403 class TestKeycloakAdminToken: @pytest.mark.asyncio async def test_no_credentials_raises(self, monkeypatch): from fastapi import HTTPException from app import auth from app.config import settings monkeypatch.setattr(settings, "keycloak_admin_user", "") with pytest.raises(HTTPException) as exc: await auth.keycloak_admin_token() assert exc.value.status_code == 500 @pytest.mark.asyncio async def test_returns_access_token_on_success(self, monkeypatch): from app import auth from app.config import settings from unittest.mock import MagicMock, patch import httpx as _httpx monkeypatch.setattr(settings, "keycloak_admin_user", "admin") monkeypatch.setattr(settings, "keycloak_admin_password", "secret") monkeypatch.setattr(settings, "keycloak_url", "https://sso.example") fake_resp = MagicMock(status_code=200, json=lambda: {"access_token": "TOKEN-123"}) async def fake_post(self, url, data=None, **kw): return fake_resp with patch.object(_httpx.AsyncClient, "post", fake_post): tok = await auth.keycloak_admin_token() assert tok == "TOKEN-123" @pytest.mark.asyncio async def test_keycloak_error_raises_500(self, monkeypatch): from fastapi import HTTPException from app import auth from app.config import settings from unittest.mock import MagicMock, patch import httpx as _httpx monkeypatch.setattr(settings, "keycloak_admin_user", "admin") monkeypatch.setattr(settings, "keycloak_admin_password", "secret") fake_resp = MagicMock(status_code=500, text="server error") async def fake_post(self, url, data=None, **kw): return fake_resp with patch.object(_httpx.AsyncClient, "post", fake_post): with pytest.raises(HTTPException) as exc: await auth.keycloak_admin_token() assert exc.value.status_code == 500