Skip to content

Commands, Command Handlers, and Command Bus

In a CQRS (Command Query Responsibility Segregation) architecture, Commands represent an intent to change the state of the system. castlecraft-engineer provides a structured way to define, handle, and dispatch these commands.

1. The Command Base Class

All commands in your application should inherit from the Command base class, located in castlecraft_engineer.abstractions.command.Command.

  • Purpose: A Command is a simple data structure (often a dataclass) that encapsulates all the information required to perform a specific action or state change. It's a message that expresses an intent.
  • Characteristics:
    • They are typically named in an imperative mood (e.g., CreateUserCommand, UpdateProductPriceCommand).
    • They should be immutable once created (e.g., using @dataclass(frozen=True)).
    • They carry data but no behavior.
# Example: Defining a Command
from dataclasses import dataclass
import uuid
from castlecraft_engineer.abstractions.command import Command

@dataclass(frozen=True)
class RegisterUserCommand(Command):
    # user_id is typically generated by the system upon creation, not passed in the command.
    username: str
    email: str
    password_hash: str # Password should be pre-hashed by the caller or a dedicated service
    creator_id: uuid.UUID # Example: ID of the user initiating the command

2. The CommandHandler Base Class

For every command, there should be a corresponding CommandHandler. Command Handlers contain the actual logic to process a command. They inherit from castlecraft_engineer.abstractions.command_handler.CommandHandler[TCommand] (assuming the base class is generic over the command type and its execute method returns Any or None).

  • Purpose: To encapsulate the business logic required to execute a specific command. This often involves interacting with domain models (Aggregates), repositories, and other services.
  • Key Methods:
    • execute(self, command: TCommand) -> Any (or async execute): This is the main method where the command's logic is implemented. It receives the command object and performs the necessary actions. The return type is Any by default in the base class, allowing handlers to return specific results (like an ID) or None.
    • authorize(self, command: TCommand) -> bool (or async authorize): This method is called by the CommandBus before execute. It's responsible for checking if the current context (e.g., user, permissions) is authorized to execute the given command. If authorization fails, it should raise an AuthorizationError. Implementations should return True if authorized.
# Example: Defining a CommandHandler
import uuid
from typing import Any # For execute return type if needed
from sqlalchemy.ext.asyncio import AsyncSession # Assuming async operation
from castlecraft_engineer.abstractions.command_handler import CommandHandler
# from castlecraft_engineer.abstractions.aggregate import AsyncAggregateRepository # Example
from castlecraft_engineer.authorization.base_service import AuthorizationService # Example
from castlecraft_engineer.exc import AuthorizationError # Standard error

# Assuming RegisterUserCommand is defined as above
# Assuming User and UserModel are defined domain/SQL models
# from your_domain.user import User, UserModel

# class UserRepository(AsyncAggregateRepository[uuid.UUID, User, UserModel]):
#     def __init__(self):
#         super().__init__(aggregate_cls=User, model_cls=UserModel)

# Corrected CommandHandler signature
class RegisterUserHandler(CommandHandler[RegisterUserCommand]):
    def __init__(self, session: AsyncSession, auth_service: AuthorizationService): #, user_repo: UserRepository):
        self.session = session
        # self.user_repo = user_repo
        self.auth_service = auth_service # Example for authorization

    async def authorize(self, command: RegisterUserCommand) -> bool:
        # Example: Check if the current context allows user registration.
        # This is a simplified check. Real checks might involve user roles, permissions, etc.
        # await self.auth_service.check_permission(subject_id=str(command.creator_id), required_permissions=[...])
        print(f"Authorizing RegisterUserCommand for user: {command.username} by {command.creator_id}")
        # If not authorized, raise: raise AuthorizationError("Not authorized to register user.")
        return True # For this example, always allow

    async def execute(self, command: RegisterUserCommand) -> uuid.UUID: # Example: returning the new user's ID
        print(f"Executing RegisterUserCommand for: {command.username}, {command.email} by {command.creator_id}")

        new_user_id = uuid.uuid4() # ID is generated here

        # 1. Create a new User aggregate (example)
        # user = User.create(id=new_user_id, username=command.username, email=command.email, password_hash=command.password_hash, creator_id=command.creator_id)
        # 2. Save the aggregate using the repository
        # await self.user_repo.save(user, self.session)
        # 3. Commit the session (often handled by a Unit of Work or middleware)
        # await self.session.commit()
        # 4. Optionally, publish events (also often handled by UoW)
        # for event in user.pull_uncommitted_events():
        #     await self.event_publisher.publish(event)
        print(f"User {command.username} (ID: {new_user_id}) would be created here.")
        return new_user_id # Return the ID of the newly created user

3. The CommandBus

The CommandBus (from castlecraft_engineer.abstractions.command_bus.CommandBus) acts as a central dispatcher for commands. It decouples the part of your application that issues a command (e.g., an API endpoint) from the handler that processes it.

  • Purpose: To route commands to their appropriate handlers.
  • Key Functionalities:
    • Handler Registration: Command Handlers are registered with the CommandBus for specific Command types. This is often done at application startup, typically using a Dependency Injection (DI) container.
      • castlecraft-engineer's CommandBus (when used with ContainerBuilder) allows registration via builder.command_bus.register(CommandType, CommandHandlerType).
    • Handler Resolution: When a command is dispatched, the bus looks up the registered handler for that command's type. castlecraft-engineer's CommandBus resolves handlers from a punq.Container (the DI container).
    • Execution Flow: When command_bus.execute(command, **kwargs) is called (where kwargs can include subject_id, permissions for authorization):
      1. The bus resolves the handler for the type of command.
      2. It calls the handler's authorize(command, **kwargs) method (passing through relevant kwargs).
      3. If authorization succeeds (returns True), it calls the handler's execute(command, **kwargs) method (passing through relevant kwargs).
      4. It returns the result from the handler's execute method.

4. CommandHandlerNotFoundError Exception

If the CommandBus receives a command for which no handler has been registered, it will raise a CommandHandlerNotFoundError. This exception is defined in castlecraft_engineer.abstractions.command_bus and re-exported in castlecraft_engineer.exc.

It's important to ensure all your commands have corresponding handlers registered with the bus.

5. Code Example: Wiring Up and Executing

This example shows how to set up the DI container, register the handler, and execute a command via the bus.

import asyncio
import uuid
from sqlalchemy.ext.asyncio import AsyncSession # For type hinting
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.command_bus import CommandBus
from castlecraft_engineer.authorization.default_services import AllowAllAuthorizationService # For example
from castlecraft_engineer.authorization.base_service import AuthorizationService
from castlecraft_engineer.exc import CommandHandlerNotFoundError, AuthorizationError

# Assuming RegisterUserCommand and RegisterUserHandler are defined as in the examples above
# from your_app.commands import RegisterUserCommand  # Defined above for this script
# from your_app.handlers import RegisterUserHandler # Defined above for this script

async def main():
    # 1. Configure DI Container
    builder = ContainerBuilder()

    # Register a dummy AsyncSession for this example if not using a real DB
    # In a real app, builder.with_async_database() would be used.
    class MockAsyncSession:
        async def commit(self): print("MockSession: commit called")
        async def rollback(self): print("MockSession: rollback called")
        # Add other methods your handler might expect from AsyncSession if any

    builder.register(AuthorizationService, AllowAllAuthorizationService) # Example auth service
    builder.register(AsyncSession, instance=MockAsyncSession()) # Register an instance of the mock
    # builder.register(UserRepository) # If UserRepository was used by the handler

    # Register CommandBus and the handler
    builder.with_command_bus()
    # The CommandBus instance is available on the builder after with_command_bus()
    # RegisterUserHandler depends on AsyncSession and AuthorizationService
    builder.command_bus.register(RegisterUserCommand, RegisterUserHandler)

    container = builder.build()

    # 2. Resolve CommandBus
    command_bus = container.resolve(CommandBus)

    # 3. Create and Execute Command
    initiating_user_id = uuid.uuid4() # Example ID of the user performing the action
    cmd = RegisterUserCommand(
        username="testuser",
        email="test@example.com",
        password_hash="hashed_password_example",
        creator_id=initiating_user_id
    )

    try:
        print(f"Dispatching command: {cmd}")
        # Pass subject_id for authorization context if needed by your auth service
        created_user_id = await command_bus.execute(cmd, subject_id=str(initiating_user_id))
        print(f"Command executed. Result (new user_id): {created_user_id}")
    except CommandHandlerNotFoundError:
        print(f"Error: No handler found for {type(cmd).__name__}")
    except AuthorizationError as e:
        print(f"Authorization Error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

if __name__ == "__main__":
    # To run this example, ensure RegisterUserCommand and RegisterUserHandler
    # are defined in this script (as they are in the example snippets).
    asyncio.run(main())

This illustrates the complete flow from defining commands and handlers to dispatching them through the CommandBus with DI integration. The authorize method in the handler provides a hook for permission checks before execution.