Authentication in Castlecraft Engineer¶
Authentication in castlecraft-engineer
is handled by the AuthenticationService
located in castlecraft_engineer.application.auth
. This service is responsible for verifying user identity, primarily through token-based mechanisms. It integrates caching for performance and supports standard OpenID Connect (OIDC) flows like JWKS fetching for token signature verification, token introspection, and fetching user information.
Core AuthenticationService
Functionality¶
The AuthenticationService
is designed to:
* Verify tokens against a JSON Web Key Set (JWKS).
* Introspect tokens via a dedicated endpoint.
* Fetch user information from a UserInfo endpoint.
* Utilize caching (synchronous and asynchronous Redis) to store JWKS responses, verified token payloads, and introspection results to reduce external calls.
Initialization¶
The service can be initialized with optional synchronous and asynchronous Redis cache clients:
# from redis import Redis
# from redis.asyncio import Redis as AsyncRedis
from castlecraft_engineer.application.auth import AuthenticationService
# sync_cache = Redis(host='localhost', port=6379, db=0)
# async_cache = AsyncRedis(host='localhost', port=6379, db=0)
auth_service = AuthenticationService(
# cache_client=sync_cache,
# async_cache_client=async_cache
)
Caching Strategy¶
The service employs caching for:
* JWKS Response: Fetched from ENV_JWKS_URL
and cached with a TTL (JWKS_TTL_SEC
, default 10800s).
* Verified Token Payloads: After successful ID token verification or token introspection, the resulting user/token data is cached. The cache key is prefixed with BEARER_TOKEN_KEY_PREFIX
(default "bearer_token|") followed by the token itself. The TTL is derived from the token's exp
claim or defaults to DEFAULT_TOKEN_TTL_SEC
(default 3600s).
Pickle is used for serialization/deserialization of cached objects. Ensure that your Redis instance is secured and data sources are trusted.
Key Operations¶
1. Fetching JWKS: get_active_jwks_response()
¶
This asynchronous method retrieves the JSON Web Key Set.
1. Checks the cache (async or sync) for JWKS_RESPONSE_KEY
.
2. If cached and valid (checked by _is_jwks_valid
), returns the cached JWKS.
3. If not cached or invalid, fetches from the URL specified by the ENV_JWKS_URL
environment variable.
4. Validates the fetched JWKS structure.
5. Caches the valid JWKS response.
Returns None
if ENV_JWKS_URL
is not set or if fetching/validation fails.
2. Verifying ID Tokens: verify_id_token(token: str)
¶
This asynchronous method verifies an ID token (typically a JWT) signed by an external Identity Provider.
1. Fetches the active JWKS using get_active_jwks_response()
.
2. Extracts the kid
(Key ID) from the unverified token header.
3. Finds the corresponding public key in the JWKS.
4. Decodes and verifies the token using the public key, checking signature, expiration (exp
), audience (aud
from ENV_ALLOWED_AUD
), issued at (iat
), and not before (nbf
) claims. A leeway of 60 seconds is applied for clock skew.
5. If verification is successful, the decoded user payload is cached (prefixed with BEARER_TOKEN_KEY_PREFIX
) with a TTL based on the token's exp
claim.
Returns the decoded user payload (a dictionary) upon success, or None
upon failure. The cached token is deleted on verification failure.
3. Introspecting Tokens: introspect_token(token: str)
¶
This asynchronous method validates a token by calling an OAuth 2.0 Token Introspection endpoint.
1. Retrieves the introspection endpoint URL from ENV_INTROSPECT_URL
.
2. Constructs a POST request with the token (using the key from ENV_INTROSPECT_TOKEN_KEY
or DEFAULT_INTROSPECT_TOKEN_KEY
).
3. Optionally includes client credentials (from ENV_CLIENT_ID
, ENV_CLIENT_SECRET
) if ENV_INTROSPECT_REQUIRES_AUTH
is true.
4. If the introspection response indicates the token is active (active: true
):
a. The introspection response (a dictionary) is cached (prefixed with BEARER_TOKEN_KEY_PREFIX
) with a TTL based on its exp
claim.
b. If ENV_ENABLE_FETCH_USERINFO
is true and ENV_USERINFO_URL
is set, it then calls fetch_userinfo()
.
Returns the introspection response (possibly merged with userinfo) or None
if introspection fails or the token is inactive. The cache is cleared for the token if introspection indicates inactivity.
4. Fetching UserInfo: fetch_userinfo(userinfo_url: str, token: str)
¶
This synchronous method fetches user information from the OIDC UserInfo endpoint.
1. Makes a GET request to the provided userinfo_url
with the access token in the Authorization: Bearer <token>
header.
Returns the userinfo response (a dictionary) or None
on failure.
Note: This method is called internally by introspect_token
if configured, and its result is merged with the introspection data before caching.
5. Unified Token Verification: verify_user(token: str)
¶
This is the primary asynchronous method applications should use to verify a token and get user data.
1. Checks the cache (async or sync) for the token (key: BEARER_TOKEN_KEY_PREFIX + token
).
2. If a valid (non-expired) cached entry exists, it's returned.
3. If cached data is expired, it's deleted from the cache.
4. If not cached or expired:
a. If ENV_ENABLE_VERIFY_ID_TOKEN
is true, it attempts verify_id_token(token)
. If successful, the result is returned.
b. If ID token verification is disabled or fails, and if ENV_ENABLE_INTROSPECT_TOKEN
is true, it attempts introspect_token(token)
. If successful, the result is returned.
5. Returns None
if all verification methods fail.
Configuration (Environment Variables)¶
The service heavily relies on environment variables for its configuration:
JWKS_URL
: URL to fetch the JSON Web Key Set.ALLOWED_AUD
: Comma-separated list of allowed audiences for ID token verification.INTROSPECT_URL
: URL of the token introspection endpoint.INTROSPECT_TOKEN_KEY
: The key name for the token in the introspection request form data (default:token
).INTROSPECT_REQUIRES_AUTH
: Set totrue
if the introspection endpoint requires client authentication (default:false
).CLIENT_ID
: Client ID for introspection endpoint authentication.CLIENT_SECRET
: Client secret for introspection endpoint authentication.USERINFO_URL
: URL of the UserInfo endpoint.ENABLE_VERIFY_ID_TOKEN
: Set totrue
to enable ID token verification flow (default:false
).ENABLE_INTROSPECT_TOKEN
: Set totrue
to enable token introspection flow (default:false
).ENABLE_FETCH_USERINFO
: Set totrue
to enable fetching userinfo after successful introspection (default:false
).
Usage in an Application¶
- Setup: Ensure Redis (sync and/or async) is available if caching is desired. Set the necessary environment variables.
- Instantiation: Create an instance of
AuthenticationService
, providing cache clients if used. - Dependency Injection: Register the
AuthenticationService
instance with your DI container. - Protecting Endpoints: In your web framework (e.g., FastAPI), create a dependency that retrieves the token from the request (e.g.,
Authorization: Bearer <token>
) and callsawait auth_service.verify_user(token)
.- If
verify_user
returns a user dictionary, authentication is successful. The user data can be passed to the route handler. - If
verify_user
returnsNone
, authentication fails. An appropriate HTTP error (e.g., 401 Unauthorized) should be returned.
- If
# Conceptual FastAPI dependency example
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from castlecraft_engineer.application.auth import AuthenticationService
from typing import Optional, Dict, Any
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token") # Example token URL
async def get_current_verified_user(
token: str = Depends(oauth2_scheme),
auth_service: AuthenticationService = Depends() # Assuming DI setup
) -> Optional[Dict[str, Any]]: # Returns the user payload dict or None
user_payload = await auth_service.verify_user(token)
if not user_payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# You might want to transform user_payload into an AuthenticatedSubject-like object here
# or perform additional checks (e.g., user active status).
return user_payload
# Example FastAPI route
# @app.get("/users/me")
# async def read_users_me(current_user: Dict[str, Any] = Depends(get_current_verified_user)):
# return current_user
user
dictionary returned by verify_user
contains the claims from the token (if ID token verification) or the introspection response (possibly merged with userinfo). This data can then be used for authorization decisions.
Error Handling¶
The service logs errors during its operations (e.g., HTTP errors, JSON decoding errors, cache errors). Applications should handle the None
return from verify_user
to indicate authentication failure to the client.