test(#134): auth.py Coverage 47.1% → 86%
Security-kritisch — jetzt mit umfassender Test-Abdeckung: - TestKeycloakUrls: issuer + jwks-URL-Konstruktion - TestGetJwks: Cache-Hit (frisch), Fetch bei leerem Cache, Stale-Cache bei HTTP-Fehler (statt komplettem Crash) - TestValidateToken: kein JWKS → None - TestGetCurrentUser: Auth-disabled → None, kein Token → None - TestRequireAuth: Dev-Modus, 401 ohne Token, 401 ungueltig, 200 mit validem Token - TestRequireAdmin: Dev-admin, admin-Rolle, gwoe-admin-Rolle, 403 ohne Admin-Rolle - TestKeycloakAdminToken: keine Credentials → 500, Erfolg → access_token, Keycloak-Fehler → 500 Verbleibend: kid-not-found-Pfad, ExpiredSignature/JWTError/ImportError- Branches im _validate_token-Inneren — wuerden voll gemockten jose-Stack brauchen. Total Coverage: 51.2% → 52.1%, 750 → 769 Tests.
This commit is contained in:
parent
3edb1e7501
commit
58bfc84c41
@ -212,3 +212,257 @@ class TestPickBestTitle:
|
|||||||
|
|
||||||
def test_empty_doc_title_uses_llm(self):
|
def test_empty_doc_title_uses_llm(self):
|
||||||
assert _pick_best_title("Guter LLM-Titel", "", "18/123") == "Guter LLM-Titel"
|
assert _pick_best_title("Guter LLM-Titel", "", "18/123") == "Guter LLM-Titel"
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Coverage-Backfill (#134) ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestKeycloakUrls:
|
||||||
|
def test_issuer_includes_realm(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
from app.config import settings
|
||||||
|
monkeypatch.setattr(settings, "keycloak_url", "https://sso.example")
|
||||||
|
monkeypatch.setattr(settings, "keycloak_realm", "myrealm")
|
||||||
|
assert auth._keycloak_issuer() == "https://sso.example/realms/myrealm"
|
||||||
|
|
||||||
|
def test_jwks_url_appends_certs(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
from app.config import settings
|
||||||
|
monkeypatch.setattr(settings, "keycloak_url", "https://sso.example")
|
||||||
|
monkeypatch.setattr(settings, "keycloak_realm", "myrealm")
|
||||||
|
assert auth._keycloak_jwks_url() == (
|
||||||
|
"https://sso.example/realms/myrealm/protocol/openid-connect/certs"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetJwks:
|
||||||
|
"""JWKS-Cache-Verhalten + HTTP-Fehler-Pfad."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_returns_cached_when_fresh(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
import time as _time
|
||||||
|
# Stelle sicher: Cache ist gesetzt + nicht abgelaufen
|
||||||
|
monkeypatch.setattr(auth, "_jwks_cache", {"keys": [{"kid": "abc"}]})
|
||||||
|
monkeypatch.setattr(auth, "_jwks_cache_time", _time.time())
|
||||||
|
|
||||||
|
result = await auth._get_jwks()
|
||||||
|
assert result == {"keys": [{"kid": "abc"}]}
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetches_when_cache_empty(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
import httpx as _httpx
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
monkeypatch.setattr(auth, "_jwks_cache", {})
|
||||||
|
monkeypatch.setattr(auth, "_jwks_cache_time", 0)
|
||||||
|
|
||||||
|
fake_resp = MagicMock(status_code=200, json=lambda: {"keys": [{"kid": "new"}]})
|
||||||
|
|
||||||
|
async def fake_get(self, url):
|
||||||
|
return fake_resp
|
||||||
|
|
||||||
|
with patch.object(_httpx.AsyncClient, "get", fake_get):
|
||||||
|
result = await auth._get_jwks()
|
||||||
|
assert "keys" in result
|
||||||
|
assert result["keys"][0]["kid"] == "new"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_http_error_returns_stale_cache(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
import httpx as _httpx
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
# Stale cache vorhanden
|
||||||
|
monkeypatch.setattr(auth, "_jwks_cache", {"keys": [{"kid": "old"}]})
|
||||||
|
monkeypatch.setattr(auth, "_jwks_cache_time", 0) # abgelaufen
|
||||||
|
|
||||||
|
async def failing_get(self, url):
|
||||||
|
raise _httpx.ConnectError("network down")
|
||||||
|
|
||||||
|
with patch.object(_httpx.AsyncClient, "get", failing_get):
|
||||||
|
result = await auth._get_jwks()
|
||||||
|
# Stale-Cache wird zurueckgegeben
|
||||||
|
assert result == {"keys": [{"kid": "old"}]}
|
||||||
|
|
||||||
|
|
||||||
|
class TestValidateToken:
|
||||||
|
"""_validate_token: Schlüssel-Lookup, Payload-Mapping."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_no_jwks_returns_none(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
async def fake_jwks():
|
||||||
|
return {}
|
||||||
|
monkeypatch.setattr(auth, "_get_jwks", fake_jwks)
|
||||||
|
result = await auth._validate_token("any-token")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetCurrentUser:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_returns_none_when_auth_disabled(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: False)
|
||||||
|
request = MagicMock(headers={}, cookies={})
|
||||||
|
assert await auth.get_current_user(request) is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_returns_none_when_no_token(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
|
||||||
|
request = MagicMock()
|
||||||
|
request.headers.get = lambda k, d="": ""
|
||||||
|
request.cookies.get = lambda k: None
|
||||||
|
assert await auth.get_current_user(request) is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestRequireAuth:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_dev_mode_returns_anonymous(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: False)
|
||||||
|
request = MagicMock()
|
||||||
|
user = await auth.require_auth(request)
|
||||||
|
assert user["sub"] == "anonymous"
|
||||||
|
assert "Dev-Modus" in user["name"]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_no_token_raises_401(self, monkeypatch):
|
||||||
|
from fastapi import HTTPException
|
||||||
|
from app import auth
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
|
||||||
|
request = MagicMock()
|
||||||
|
request.headers.get = lambda k, d="": ""
|
||||||
|
request.cookies.get = lambda k: None
|
||||||
|
with pytest.raises(HTTPException) as exc:
|
||||||
|
await auth.require_auth(request)
|
||||||
|
assert exc.value.status_code == 401
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_invalid_token_raises_401(self, monkeypatch):
|
||||||
|
from fastapi import HTTPException
|
||||||
|
from app import auth
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
|
||||||
|
async def fake_validate(t): return None
|
||||||
|
monkeypatch.setattr(auth, "_validate_token", fake_validate)
|
||||||
|
request = MagicMock()
|
||||||
|
request.headers.get = lambda k, d="": "Bearer bad-token"
|
||||||
|
request.cookies.get = lambda k: None
|
||||||
|
with pytest.raises(HTTPException) as exc:
|
||||||
|
await auth.require_auth(request)
|
||||||
|
assert exc.value.status_code == 401
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_valid_token_returns_user(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
|
||||||
|
async def fake_validate(t): return {"sub": "u1", "email": "a@b", "name": "X", "roles": []}
|
||||||
|
monkeypatch.setattr(auth, "_validate_token", fake_validate)
|
||||||
|
request = MagicMock()
|
||||||
|
request.headers.get = lambda k, d="": "Bearer ok-token"
|
||||||
|
user = await auth.require_auth(request)
|
||||||
|
assert user["sub"] == "u1"
|
||||||
|
|
||||||
|
|
||||||
|
class TestRequireAdmin:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_dev_mode_returns_anonymous_admin(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: False)
|
||||||
|
user = await auth.require_admin(MagicMock())
|
||||||
|
assert "admin" in user["roles"]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_admin_role_passes(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
|
||||||
|
async def fake_require_auth(req): return {"sub": "u1", "roles": ["admin"]}
|
||||||
|
monkeypatch.setattr(auth, "require_auth", fake_require_auth)
|
||||||
|
user = await auth.require_admin(MagicMock())
|
||||||
|
assert "admin" in user["roles"]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_gwoe_admin_role_passes(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
|
||||||
|
async def fake_require_auth(req): return {"sub": "u1", "roles": ["gwoe-admin"]}
|
||||||
|
monkeypatch.setattr(auth, "require_auth", fake_require_auth)
|
||||||
|
user = await auth.require_admin(MagicMock())
|
||||||
|
assert "gwoe-admin" in user["roles"]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_no_admin_role_raises_403(self, monkeypatch):
|
||||||
|
from fastapi import HTTPException
|
||||||
|
from app import auth
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
monkeypatch.setattr(auth, "_is_auth_enabled", lambda: True)
|
||||||
|
async def fake_require_auth(req): return {"sub": "u1", "roles": ["user"]}
|
||||||
|
monkeypatch.setattr(auth, "require_auth", fake_require_auth)
|
||||||
|
with pytest.raises(HTTPException) as exc:
|
||||||
|
await auth.require_admin(MagicMock())
|
||||||
|
assert exc.value.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
|
class TestKeycloakAdminToken:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_no_credentials_raises(self, monkeypatch):
|
||||||
|
from fastapi import HTTPException
|
||||||
|
from app import auth
|
||||||
|
from app.config import settings
|
||||||
|
monkeypatch.setattr(settings, "keycloak_admin_user", "")
|
||||||
|
with pytest.raises(HTTPException) as exc:
|
||||||
|
await auth.keycloak_admin_token()
|
||||||
|
assert exc.value.status_code == 500
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_returns_access_token_on_success(self, monkeypatch):
|
||||||
|
from app import auth
|
||||||
|
from app.config import settings
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
import httpx as _httpx
|
||||||
|
|
||||||
|
monkeypatch.setattr(settings, "keycloak_admin_user", "admin")
|
||||||
|
monkeypatch.setattr(settings, "keycloak_admin_password", "secret")
|
||||||
|
monkeypatch.setattr(settings, "keycloak_url", "https://sso.example")
|
||||||
|
|
||||||
|
fake_resp = MagicMock(status_code=200,
|
||||||
|
json=lambda: {"access_token": "TOKEN-123"})
|
||||||
|
|
||||||
|
async def fake_post(self, url, data=None, **kw):
|
||||||
|
return fake_resp
|
||||||
|
|
||||||
|
with patch.object(_httpx.AsyncClient, "post", fake_post):
|
||||||
|
tok = await auth.keycloak_admin_token()
|
||||||
|
assert tok == "TOKEN-123"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_keycloak_error_raises_500(self, monkeypatch):
|
||||||
|
from fastapi import HTTPException
|
||||||
|
from app import auth
|
||||||
|
from app.config import settings
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
import httpx as _httpx
|
||||||
|
|
||||||
|
monkeypatch.setattr(settings, "keycloak_admin_user", "admin")
|
||||||
|
monkeypatch.setattr(settings, "keycloak_admin_password", "secret")
|
||||||
|
|
||||||
|
fake_resp = MagicMock(status_code=500, text="server error")
|
||||||
|
|
||||||
|
async def fake_post(self, url, data=None, **kw):
|
||||||
|
return fake_resp
|
||||||
|
|
||||||
|
with patch.object(_httpx.AsyncClient, "post", fake_post):
|
||||||
|
with pytest.raises(HTTPException) as exc:
|
||||||
|
await auth.keycloak_admin_token()
|
||||||
|
assert exc.value.status_code == 500
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user