Skip to content

Authentication in Castlecraft Engineer

Authentication in castlecraft-engineer is handled by the AuthenticationService located in castlecraft_engineer.application.auth. This service is responsible for verifying user identity, primarily through token-based mechanisms. It integrates caching for performance and supports standard OpenID Connect (OIDC) flows like JWKS fetching for token signature verification, token introspection, and fetching user information.

Core AuthenticationService Functionality

The AuthenticationService is designed to: * Verify tokens against a JSON Web Key Set (JWKS). * Introspect tokens via a dedicated endpoint. * Fetch user information from a UserInfo endpoint. * Utilize caching (synchronous and asynchronous Redis) to store JWKS responses, verified token payloads, and introspection results to reduce external calls.

Initialization

The service can be initialized with optional synchronous and asynchronous Redis cache clients:

# from redis import Redis
# from redis.asyncio import Redis as AsyncRedis
from castlecraft_engineer.application.auth import AuthenticationService

# sync_cache = Redis(host='localhost', port=6379, db=0)
# async_cache = AsyncRedis(host='localhost', port=6379, db=0)

auth_service = AuthenticationService(
    # cache_client=sync_cache,
    # async_cache_client=async_cache
)

Caching Strategy

The service employs caching for: * JWKS Response: Fetched from ENV_JWKS_URL and cached with a TTL (JWKS_TTL_SEC, default 10800s). * Verified Token Payloads: After successful ID token verification or token introspection, the resulting user/token data is cached. The cache key is prefixed with BEARER_TOKEN_KEY_PREFIX (default "bearer_token|") followed by the token itself. The TTL is derived from the token's exp claim or defaults to DEFAULT_TOKEN_TTL_SEC (default 3600s). Pickle is used for serialization/deserialization of cached objects. Ensure that your Redis instance is secured and data sources are trusted.

Key Operations

1. Fetching JWKS: get_active_jwks_response()

This asynchronous method retrieves the JSON Web Key Set. 1. Checks the cache (async or sync) for JWKS_RESPONSE_KEY. 2. If cached and valid (checked by _is_jwks_valid), returns the cached JWKS. 3. If not cached or invalid, fetches from the URL specified by the ENV_JWKS_URL environment variable. 4. Validates the fetched JWKS structure. 5. Caches the valid JWKS response. Returns None if ENV_JWKS_URL is not set or if fetching/validation fails.

2. Verifying ID Tokens: verify_id_token(token: str)

This asynchronous method verifies an ID token (typically a JWT) signed by an external Identity Provider. 1. Fetches the active JWKS using get_active_jwks_response(). 2. Extracts the kid (Key ID) from the unverified token header. 3. Finds the corresponding public key in the JWKS. 4. Decodes and verifies the token using the public key, checking signature, expiration (exp), audience (aud from ENV_ALLOWED_AUD), issued at (iat), and not before (nbf) claims. A leeway of 60 seconds is applied for clock skew. 5. If verification is successful, the decoded user payload is cached (prefixed with BEARER_TOKEN_KEY_PREFIX) with a TTL based on the token's exp claim. Returns the decoded user payload (a dictionary) upon success, or None upon failure. The cached token is deleted on verification failure.

3. Introspecting Tokens: introspect_token(token: str)

This asynchronous method validates a token by calling an OAuth 2.0 Token Introspection endpoint. 1. Retrieves the introspection endpoint URL from ENV_INTROSPECT_URL. 2. Constructs a POST request with the token (using the key from ENV_INTROSPECT_TOKEN_KEY or DEFAULT_INTROSPECT_TOKEN_KEY). 3. Optionally includes client credentials (from ENV_CLIENT_ID, ENV_CLIENT_SECRET) if ENV_INTROSPECT_REQUIRES_AUTH is true. 4. If the introspection response indicates the token is active (active: true): a. The introspection response (a dictionary) is cached (prefixed with BEARER_TOKEN_KEY_PREFIX) with a TTL based on its exp claim. b. If ENV_ENABLE_FETCH_USERINFO is true and ENV_USERINFO_URL is set, it then calls fetch_userinfo(). Returns the introspection response (possibly merged with userinfo) or None if introspection fails or the token is inactive. The cache is cleared for the token if introspection indicates inactivity.

4. Fetching UserInfo: fetch_userinfo(userinfo_url: str, token: str)

This synchronous method fetches user information from the OIDC UserInfo endpoint. 1. Makes a GET request to the provided userinfo_url with the access token in the Authorization: Bearer <token> header. Returns the userinfo response (a dictionary) or None on failure. Note: This method is called internally by introspect_token if configured, and its result is merged with the introspection data before caching.

5. Unified Token Verification: verify_user(token: str)

This is the primary asynchronous method applications should use to verify a token and get user data. 1. Checks the cache (async or sync) for the token (key: BEARER_TOKEN_KEY_PREFIX + token). 2. If a valid (non-expired) cached entry exists, it's returned. 3. If cached data is expired, it's deleted from the cache. 4. If not cached or expired: a. If ENV_ENABLE_VERIFY_ID_TOKEN is true, it attempts verify_id_token(token). If successful, the result is returned. b. If ID token verification is disabled or fails, and if ENV_ENABLE_INTROSPECT_TOKEN is true, it attempts introspect_token(token). If successful, the result is returned. 5. Returns None if all verification methods fail.

Configuration (Environment Variables)

The service heavily relies on environment variables for its configuration:

  • JWKS_URL: URL to fetch the JSON Web Key Set.
  • ALLOWED_AUD: Comma-separated list of allowed audiences for ID token verification.
  • INTROSPECT_URL: URL of the token introspection endpoint.
  • INTROSPECT_TOKEN_KEY: The key name for the token in the introspection request form data (default: token).
  • INTROSPECT_REQUIRES_AUTH: Set to true if the introspection endpoint requires client authentication (default: false).
  • CLIENT_ID: Client ID for introspection endpoint authentication.
  • CLIENT_SECRET: Client secret for introspection endpoint authentication.
  • USERINFO_URL: URL of the UserInfo endpoint.
  • ENABLE_VERIFY_ID_TOKEN: Set to true to enable ID token verification flow (default: false).
  • ENABLE_INTROSPECT_TOKEN: Set to true to enable token introspection flow (default: false).
  • ENABLE_FETCH_USERINFO: Set to true to enable fetching userinfo after successful introspection (default: false).

Usage in an Application

  1. Setup: Ensure Redis (sync and/or async) is available if caching is desired. Set the necessary environment variables.
  2. Instantiation: Create an instance of AuthenticationService, providing cache clients if used.
  3. Dependency Injection: Register the AuthenticationService instance with your DI container.
  4. Protecting Endpoints: In your web framework (e.g., FastAPI), create a dependency that retrieves the token from the request (e.g., Authorization: Bearer <token>) and calls await auth_service.verify_user(token).
    • If verify_user returns a user dictionary, authentication is successful. The user data can be passed to the route handler.
    • If verify_user returns None, authentication fails. An appropriate HTTP error (e.g., 401 Unauthorized) should be returned.

# Conceptual FastAPI dependency example
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from castlecraft_engineer.application.auth import AuthenticationService
from typing import Optional, Dict, Any

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token") # Example token URL

async def get_current_verified_user(
    token: str = Depends(oauth2_scheme),
    auth_service: AuthenticationService = Depends() # Assuming DI setup
) -> Optional[Dict[str, Any]]: # Returns the user payload dict or None
    user_payload = await auth_service.verify_user(token)
    if not user_payload:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    # You might want to transform user_payload into an AuthenticatedSubject-like object here
    # or perform additional checks (e.g., user active status).
    return user_payload

# Example FastAPI route
# @app.get("/users/me")
# async def read_users_me(current_user: Dict[str, Any] = Depends(get_current_verified_user)):
#     return current_user
The user dictionary returned by verify_user contains the claims from the token (if ID token verification) or the introspection response (possibly merged with userinfo). This data can then be used for authorization decisions.

Error Handling

The service logs errors during its operations (e.g., HTTP errors, JSON decoding errors, cache errors). Applications should handle the None return from verify_user to indicate authentication failure to the client.