Skip to content

Commands, Command Handlers, and Command Bus

In a CQRS (Command Query Responsibility Segregation) architecture, Commands represent an intent to change the state of the system. castlecraft-engineer provides a structured way to define, handle, and dispatch these commands.

1. The Command Base Class

All commands in your application should inherit from the Command base class, located in castlecraft_engineer.abstractions.command.Command.

  • Purpose: A Command is a simple data structure (often a dataclass) that encapsulates all the information required to perform a specific action or state change. It's a message that expresses an intent.
  • Characteristics:
    • They are typically named in an imperative mood (e.g., CreateUserCommand, UpdateProductPriceCommand).
    • They should be immutable once created.
    • They carry data but no behavior.
# Example: Defining a Command
from dataclasses import dataclass
import uuid
from castlecraft_engineer.abstractions.command import Command

@dataclass(frozen=True)
class RegisterUserCommand(Command):
    user_id: uuid.UUID # Could be generated by the handler or passed in
    username: str
    email: str
    password_hash: str # Assuming password is already hashed

2. The CommandHandler Base Class

For every command, there should be a corresponding CommandHandler. Command Handlers contain the actual logic to process a command. They inherit from castlecraft_engineer.abstractions.command_handler.CommandHandler[TCommand, TResult].

  • Purpose: To encapsulate the business logic required to execute a specific command. This often involves interacting with domain models (Aggregates), repositories, and other services.
  • Key Methods:
    • execute(self, command: TCommand) -> TResult (or async execute for asynchronous handlers): This is the main method where the command's logic is implemented. It receives the command object and performs the necessary actions. The TResult can be None if the command doesn't produce a direct result (common for commands), or it can be a simple acknowledgement or identifier.
    • authorize(self, command: TCommand) -> None (or async authorize): This method is called by the CommandBus before execute. It's responsible for checking if the current context (e.g., user, permissions) is authorized to execute the given command. If authorization fails, it should raise an AuthorizationError (or a more specific custom exception).
# Example: Defining a CommandHandler
from sqlalchemy.ext.asyncio import AsyncSession # Assuming async operation
from castlecraft_engineer.abstractions.command_handler import CommandHandler
from castlecraft_engineer.abstractions.aggregate import AsyncAggregateRepository # Example
from castlecraft_engineer.authorization.base_service import AuthorizationService # Example
from castlecraft_engineer.exc import AuthorizationError # Standard error

# Assuming RegisterUserCommand is defined as above
# Assuming User and UserModel are defined domain/SQL models
# from your_domain.user import User, UserModel

# class UserRepository(AsyncAggregateRepository[uuid.UUID, User, UserModel]):
#     pass

class RegisterUserHandler(CommandHandler[RegisterUserCommand, uuid.UUID]):
    def __init__(self, session: AsyncSession, auth_service: AuthorizationService): # user_repo: UserRepository):
        self.session = session
        # self.user_repo = user_repo
        self.auth_service = auth_service # Example for authorization
        # In a real handler, you'd inject repositories, domain services, etc.

    async def authorize(self, command: RegisterUserCommand) -> None:
        # Example: Check if the current context allows user registration.
        # This is a simplified check. Real checks might involve user roles, permissions, etc.
        # For this example, let's assume a permission like ('user', 'create', 'any')
        # await self.auth_service.check_permission(required_permissions=[...])
        print(f"Authorizing RegisterUserCommand for user: {command.username}")
        # If not authorized, raise: raise AuthorizationError("Not authorized to register user.")
        pass # For this example, always allow

    async def execute(self, command: RegisterUserCommand) -> uuid.UUID:
        print(f"Executing RegisterUserCommand for: {command.username}, {command.email}")
        # 1. Create a new User aggregate (example)
        # user = User.register(id=command.user_id, username=command.username, email=command.email, password_hash=command.password_hash)
        # 2. Save the aggregate using the repository
        # await self.user_repo.save(user, self.session)
        # 3. Commit the session (often handled by a Unit of Work or middleware)
        # await self.session.commit()
        # 4. Optionally, publish events (also often handled by UoW)
        # for event in user.pull_uncommitted_events():
        #     await self.event_publisher.publish(event)
        print(f"User {command.username} (ID: {command.user_id}) would be created here.")
        return command.user_id # Return the ID of the newly created user

3. The CommandBus

The CommandBus (from castlecraft_engineer.abstractions.command_bus.CommandBus) acts as a central dispatcher for commands. It decouples the part of your application that issues a command (e.g., an API endpoint) from the handler that processes it.

  • Purpose: To route commands to their appropriate handlers.
  • Key Functionalities:
    • Handler Registration: Command Handlers are registered with the CommandBus for specific Command types. This is often done at application startup, typically using a Dependency Injection (DI) container.
      • castlecraft-engineer's CommandBus uses a decorator for registration: @command_bus.register(CommandType, CommandHandlerType) when used with the ContainerBuilder from common/di.py.
    • Handler Resolution: When a command is dispatched, the bus looks up the registered handler for that command's type. castlecraft-engineer's CommandBus resolves handlers from a punq.Container (the DI container).
    • Execution Flow: When command_bus.execute(command) is called:
      1. The bus resolves the handler for the type of command.
      2. It calls the handler's authorize(command) method.
      3. If authorization succeeds, it calls the handler's execute(command) method.
      4. It returns the result from the handler's execute method.

4. CommandHandlerNotFoundError Exception

If the CommandBus receives a command for which no handler has been registered, it will raise a CommandHandlerNotFoundError. This exception is defined in castlecraft_engineer.abstractions.command_bus and re-exported in castlecraft_engineer.exc.

It's important to ensure all your commands have corresponding handlers registered with the bus.

5. Code Example: Wiring Up and Executing

This example shows how to set up the DI container, register the handler, and execute a command via the bus.

import asyncio
import uuid
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.command_bus import CommandBus
from castlecraft_engineer.authorization.default_services import AllowAllAuthorizationService # For example
from castlecraft_engineer.authorization.base_service import AuthorizationService

# Assuming RegisterUserCommand and RegisterUserHandler are defined as above
# from your_app.commands import RegisterUserCommand
# from your_app.handlers import RegisterUserHandler

async def main():
    # 1. Configure DI Container
    builder = ContainerBuilder()

    # Register a dummy AsyncSession factory for this example if not using a real DB
    # In a real app, builder.with_async_database() would be used.
    class MockAsyncSession:
        async def __aenter__(self): return self
        async def __aexit__(self, exc_type, exc, tb): pass
        async def commit(self): print("MockSession: commit called")
        async def rollback(self): print("MockSession: rollback called")

    builder.register(AuthorizationService, AllowAllAuthorizationService) # Example auth service
    builder.register(MockAsyncSession) # Registering the mock session itself as the factory for AsyncSession type hint
    # builder.register(UserRepository) # If UserRepository was used by the handler

    # Register CommandBus and the handler
    builder.with_command_bus()
    # The CommandBus instance is available on the builder after with_command_bus()
    # RegisterUserHandler depends on AsyncSession and AuthorizationService
    builder.command_bus.register(RegisterUserCommand, RegisterUserHandler)

    container = builder.build()

    # 2. Resolve CommandBus
    command_bus = container.resolve(CommandBus)

    # 3. Create and Execute Command
    user_id_to_create = uuid.uuid4()
    cmd = RegisterUserCommand(
        user_id=user_id_to_create,
        username="testuser",
        email="test@example.com",
        password_hash="hashed_password_example"
    )

    try:
        print(f"Dispatching command: {cmd}")
        created_user_id = await command_bus.execute(cmd)
        print(f"Command executed. Result (user_id): {created_user_id}")
        assert created_user_id == user_id_to_create
    except CommandHandlerNotFoundError:
        print(f"Error: No handler found for {type(cmd).__name__}")
    except AuthorizationError as e:
        print(f"Authorization Error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

if __name__ == "__main__":
    # To run this example, ensure your_app.commands and your_app.handlers
    # are replaced with actual paths or the classes are defined in this script.
    # For simplicity, if you copy-paste, define RegisterUserCommand and RegisterUserHandler
    # directly in this script before main().
    asyncio.run(main())

This illustrates the complete flow from defining commands and handlers to dispatching them through the CommandBus with DI integration. The authorize method in the handler provides a hook for permission checks before execution.