Skip to content

castlecraft_engineer.authorization

castlecraft_engineer.authorization

Action

Bases: BaseStringEnum

Common actions. Extendable by creating new enums.

Source code in src/castlecraft_engineer/authorization/types.py
class Action(BaseStringEnum):
    """Common actions. Extendable by creating new enums."""

    CREATE = "create"
    READ = "read"
    UPDATE = "update"
    DELETE = "delete"
    LIST = "list"

AllowAllAuthorizationService

Bases: AuthorizationService

An authorization service that always allows access.

Source code in src/castlecraft_engineer/authorization/default_services.py
class AllowAllAuthorizationService(AuthorizationService):
    """An authorization service that always allows access."""

    async def check_permission(
        self,
        subject_id: Optional[str],
        required_permissions: List[Permission],
        provided_permissions: Optional[List[str]] = None,
        context: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """Always allows the request."""
        return True

check_permission(subject_id, required_permissions, provided_permissions=None, context=None) async

Always allows the request.

Source code in src/castlecraft_engineer/authorization/default_services.py
async def check_permission(
    self,
    subject_id: Optional[str],
    required_permissions: List[Permission],
    provided_permissions: Optional[List[str]] = None,
    context: Optional[Dict[str, Any]] = None,
) -> bool:
    """Always allows the request."""
    return True

AuthorizationService

Bases: ABC

Abstract interface for authorization checks. Implementations connect to engines like Casbin, OPA, SpiceDB, etc.

Source code in src/castlecraft_engineer/authorization/base_service.py
class AuthorizationService(abc.ABC):
    """
    Abstract interface for authorization checks.
    Implementations connect to engines like Casbin,
    OPA, SpiceDB, etc.
    """

    @abc.abstractmethod
    async def check_permission(
        self,
        subject_id: Optional[str],
        required_permissions: List[Permission],
        # Optional: Pass permissions/attributes associated with the subject
        # (e.g., from token claims like roles, groups, or other string-based attributes)
        # if the engine needs them directly for evaluation.
        provided_permissions: Optional[List[str]] = None,
        # Optional: Pass additional context relevant to the decision
        # (e.g., resource owner, tenant id, command/query data)
        context: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """
        Checks if the subject has the required permissions.

        Args:
            subject_id: Identifier of the user/service performing the action.
                        Can be None for anonymous
                        checks if supported by the policy.
            required_permissions: A list of Permission objects
                                  declared by the handler
                                  via the @ctx decorator.
            provided_permissions: Optional list of permissions
                                  the subject possesses.
            context: Optional dictionary containing additional
                     data for policy evaluation.

        Returns:
            True if authorized.

        Raises:
            AuthorizationError: If the check fails.
                                This is often preferred over
                                returning False to halt execution clearly.
            NotImplementedError: If the method is not implemented.
        """
        raise NotImplementedError

check_permission(subject_id, required_permissions, provided_permissions=None, context=None) abstractmethod async

Checks if the subject has the required permissions.

Parameters:

Name Type Description Default
subject_id Optional[str]

Identifier of the user/service performing the action. Can be None for anonymous checks if supported by the policy.

required
required_permissions List[Permission]

A list of Permission objects declared by the handler via the @ctx decorator.

required
provided_permissions Optional[List[str]]

Optional list of permissions the subject possesses.

None
context Optional[Dict[str, Any]]

Optional dictionary containing additional data for policy evaluation.

None

Returns:

Type Description
bool

True if authorized.

Raises:

Type Description
AuthorizationError

If the check fails. This is often preferred over returning False to halt execution clearly.

NotImplementedError

If the method is not implemented.

Source code in src/castlecraft_engineer/authorization/base_service.py
@abc.abstractmethod
async def check_permission(
    self,
    subject_id: Optional[str],
    required_permissions: List[Permission],
    # Optional: Pass permissions/attributes associated with the subject
    # (e.g., from token claims like roles, groups, or other string-based attributes)
    # if the engine needs them directly for evaluation.
    provided_permissions: Optional[List[str]] = None,
    # Optional: Pass additional context relevant to the decision
    # (e.g., resource owner, tenant id, command/query data)
    context: Optional[Dict[str, Any]] = None,
) -> bool:
    """
    Checks if the subject has the required permissions.

    Args:
        subject_id: Identifier of the user/service performing the action.
                    Can be None for anonymous
                    checks if supported by the policy.
        required_permissions: A list of Permission objects
                              declared by the handler
                              via the @ctx decorator.
        provided_permissions: Optional list of permissions
                              the subject possesses.
        context: Optional dictionary containing additional
                 data for policy evaluation.

    Returns:
        True if authorized.

    Raises:
        AuthorizationError: If the check fails.
                            This is often preferred over
                            returning False to halt execution clearly.
        NotImplementedError: If the method is not implemented.
    """
    raise NotImplementedError

DenyAllAuthorizationService

Bases: AuthorizationService

An authorization service that always denies access.

Source code in src/castlecraft_engineer/authorization/default_services.py
class DenyAllAuthorizationService(AuthorizationService):
    """An authorization service that always denies access."""

    async def check_permission(
        self,
        subject_id: Optional[str],
        required_permissions: List[Permission],
        provided_permissions: Optional[List[str]] = None,  # Corrected type
        context: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """Always denies the request by raising an AuthorizationError."""
        raise AuthorizationError(
            subject_id,
            required_permissions,
            "Access denied by DenyAll policy.",
        )

check_permission(subject_id, required_permissions, provided_permissions=None, context=None) async

Always denies the request by raising an AuthorizationError.

Source code in src/castlecraft_engineer/authorization/default_services.py
async def check_permission(
    self,
    subject_id: Optional[str],
    required_permissions: List[Permission],
    provided_permissions: Optional[List[str]] = None,  # Corrected type
    context: Optional[Dict[str, Any]] = None,
) -> bool:
    """Always denies the request by raising an AuthorizationError."""
    raise AuthorizationError(
        subject_id,
        required_permissions,
        "Access denied by DenyAll policy.",
    )

Resource

Bases: BaseStringEnum

Common resource types. Extendable by creating new enums.

Source code in src/castlecraft_engineer/authorization/types.py
class Resource(BaseStringEnum):
    """Common resource types. Extendable by creating new enums."""

    # Add common resources if any, or leave empty for extension
    GENERIC = "generic"

Scope

Bases: BaseStringEnum

Common scopes. Extendable by creating new enums.

Source code in src/castlecraft_engineer/authorization/types.py
class Scope(BaseStringEnum):
    """
    Common scopes. Extendable by creating new enums.
    """

    ANY = "any"
    OWN = "own"

ctx(required_permissions)

Decorator to associate required permission context(s) with a handler method.

Injects 'required_permissions' (always as a list) into the keyword arguments passed to the decorated method, allowing the method's implementation to access it and perform authorization checks if needed.

Parameters:

Name Type Description Default
required_permissions Union[Permission, List[Permission]]

A single Permission object or a list of Permissions.

required
Source code in src/castlecraft_engineer/authorization/permission.py
def ctx(required_permissions: Union[Permission, List[Permission]]):
    """
    Decorator to associate required permission context(s)
    with a handler method.

    Injects 'required_permissions' (always as a list)
    into the keyword arguments passed to the decorated method,
    allowing the method's implementation to access it and
    perform authorization checks if needed.

    Args:
        required_permissions:
            A single Permission object or a list of Permissions.
    """

    perms_list = (
        [required_permissions]
        if isinstance(required_permissions, Permission)
        else required_permissions
    )

    def decorator(func):
        @wraps(func)
        def wrapper(handler_instance, *args, **kwargs):

            kwargs["required_permissions"] = perms_list

            return func(handler_instance, *args, **kwargs)

        return wrapper

    return decorator

setup_authorization(container)

Sets up the authorization service based on environment configuration and registers it with the provided DI container.

Source code in src/castlecraft_engineer/authorization/setup.py
def setup_authorization(container: punq.Container):
    """
    Sets up the authorization service
    based on environment configuration
    and registers it with the provided DI container.
    """

    auth_engine_name = os.environ.get(
        ENV_AUTHORIZATION_ENGINE,
        AUTH_ENGINE_DENY,
    )
    auth_service_instance: AuthorizationService | None = None

    logger.info(
        "Attempting to configure authorization engine: "  # noqa: E501
        f"'{auth_engine_name}'",
    )

    if auth_engine_name == AUTH_ENGINE_ALLOW:
        try:
            logger.warning(
                "Configuring AllowAllAuthorizationService. Use with caution!",
            )
            auth_service_instance = AllowAllAuthorizationService()
        except Exception as e:
            logger.critical(
                f"Failed to instantiate AllowAllAuthorizationService: {e}. "
                "Defaulting to DenyAllAuthorizationService due to critical error."
            )
            auth_service_instance = DenyAllAuthorizationService()
    elif auth_engine_name == AUTH_ENGINE_DENY:
        try:
            logger.info("Configuring DenyAllAuthorizationService.")
            auth_service_instance = DenyAllAuthorizationService()
        except Exception as e:
            logger.critical(
                f"Failed to instantiate DenyAllAuthorizationService: {e}. "
                "Defaulting to DenyAllAuthorizationService due to critical error."
            )
            auth_service_instance = None
    else:
        logger.info(
            f"Attempting to use custom pre-configured AuthorizationService for engine '{auth_engine_name}'."  # noqa: E501
        )
        try:
            auth_service_instance = container.resolve(AuthorizationService)
            logger.info(
                f"Successfully resolved pre-configured custom AuthorizationService: {auth_service_instance.__class__.__name__}"  # noqa: E501
            )
        except punq.MissingDependencyError:
            logger.error(
                f"Custom engine '{auth_engine_name}' selected, but no AuthorizationService "  # noqa: E501
                "found pre-registered in DI container. Ensure it's configured and registered by the application. Falling back."  # noqa: E501
            )
            auth_service_instance = None
        except Exception as e:
            logger.exception(
                f"An unexpected error occurred while trying to resolve custom AuthorizationService for engine '{auth_engine_name}': {e}. Falling back."  # noqa: E501
            )
            auth_service_instance = None

    # Fallback Section
    if auth_service_instance is None:
        logger.critical(
            "Authorization service could not be initialized as configured. "
            "Attempting to use DenyAllAuthorizationService as a final fallback."
        )
        try:
            auth_service_instance = DenyAllAuthorizationService()  # Final attempt
        except Exception as e:
            logger.critical(
                f"CRITICAL FAILURE: Could not instantiate even the final fallback "
                f"DenyAllAuthorizationService: {e}. Authorization will not be configured."
            )
            auth_service_instance = None

    if auth_service_instance:
        container.register(
            AuthorizationService,
            instance=auth_service_instance,
            scope=punq.Scope.singleton,
        )
        logger.info(
            "Registered AuthorizationService:"
            f" {auth_service_instance.__class__.__name__}"
        )
    else:
        logger.error(
            "CRITICAL: Could not determine or initialize "
            "any authorization service, including fallback!"
        )