Commands, Command Handlers, and Command Bus¶
In a CQRS (Command Query Responsibility Segregation) architecture, Commands represent an intent to change the state of the system. castlecraft-engineer provides a structured way to define, handle, and dispatch these commands.
1. The Command Base Class¶
All commands in your application should inherit from the Command base class, located in castlecraft_engineer.abstractions.command.Command.
- Purpose: A
Commandis a simple data structure (often a dataclass) that encapsulates all the information required to perform a specific action or state change. It's a message that expresses an intent. - Characteristics:
- They are typically named in an imperative mood (e.g.,
CreateUserCommand,UpdateProductPriceCommand). - They should be immutable once created.
- They carry data but no behavior.
- They are typically named in an imperative mood (e.g.,
# Example: Defining a Command
from dataclasses import dataclass
import uuid
from castlecraft_engineer.abstractions.command import Command
@dataclass(frozen=True)
class RegisterUserCommand(Command):
user_id: uuid.UUID # Could be generated by the handler or passed in
username: str
email: str
password_hash: str # Assuming password is already hashed
2. The CommandHandler Base Class¶
For every command, there should be a corresponding CommandHandler. Command Handlers contain the actual logic to process a command. They inherit from castlecraft_engineer.abstractions.command_handler.CommandHandler[TCommand, TResult].
- Purpose: To encapsulate the business logic required to execute a specific command. This often involves interacting with domain models (Aggregates), repositories, and other services.
- Key Methods:
execute(self, command: TCommand) -> TResult(orasync executefor asynchronous handlers): This is the main method where the command's logic is implemented. It receives the command object and performs the necessary actions. TheTResultcan beNoneif the command doesn't produce a direct result (common for commands), or it can be a simple acknowledgement or identifier.authorize(self, command: TCommand) -> None(orasync authorize): This method is called by theCommandBusbeforeexecute. It's responsible for checking if the current context (e.g., user, permissions) is authorized to execute the given command. If authorization fails, it should raise anAuthorizationError(or a more specific custom exception).
# Example: Defining a CommandHandler
from sqlalchemy.ext.asyncio import AsyncSession # Assuming async operation
from castlecraft_engineer.abstractions.command_handler import CommandHandler
from castlecraft_engineer.abstractions.aggregate import AsyncAggregateRepository # Example
from castlecraft_engineer.authorization.base_service import AuthorizationService # Example
from castlecraft_engineer.exc import AuthorizationError # Standard error
# Assuming RegisterUserCommand is defined as above
# Assuming User and UserModel are defined domain/SQL models
# from your_domain.user import User, UserModel
# class UserRepository(AsyncAggregateRepository[uuid.UUID, User, UserModel]):
# pass
class RegisterUserHandler(CommandHandler[RegisterUserCommand, uuid.UUID]):
def __init__(self, session: AsyncSession, auth_service: AuthorizationService): # user_repo: UserRepository):
self.session = session
# self.user_repo = user_repo
self.auth_service = auth_service # Example for authorization
# In a real handler, you'd inject repositories, domain services, etc.
async def authorize(self, command: RegisterUserCommand) -> None:
# Example: Check if the current context allows user registration.
# This is a simplified check. Real checks might involve user roles, permissions, etc.
# For this example, let's assume a permission like ('user', 'create', 'any')
# await self.auth_service.check_permission(required_permissions=[...])
print(f"Authorizing RegisterUserCommand for user: {command.username}")
# If not authorized, raise: raise AuthorizationError("Not authorized to register user.")
pass # For this example, always allow
async def execute(self, command: RegisterUserCommand) -> uuid.UUID:
print(f"Executing RegisterUserCommand for: {command.username}, {command.email}")
# 1. Create a new User aggregate (example)
# user = User.register(id=command.user_id, username=command.username, email=command.email, password_hash=command.password_hash)
# 2. Save the aggregate using the repository
# await self.user_repo.save(user, self.session)
# 3. Commit the session (often handled by a Unit of Work or middleware)
# await self.session.commit()
# 4. Optionally, publish events (also often handled by UoW)
# for event in user.pull_uncommitted_events():
# await self.event_publisher.publish(event)
print(f"User {command.username} (ID: {command.user_id}) would be created here.")
return command.user_id # Return the ID of the newly created user
3. The CommandBus¶
The CommandBus (from castlecraft_engineer.abstractions.command_bus.CommandBus) acts as a central dispatcher for commands. It decouples the part of your application that issues a command (e.g., an API endpoint) from the handler that processes it.
- Purpose: To route commands to their appropriate handlers.
- Key Functionalities:
- Handler Registration: Command Handlers are registered with the
CommandBusfor specificCommandtypes. This is often done at application startup, typically using a Dependency Injection (DI) container.castlecraft-engineer'sCommandBususes a decorator for registration:@command_bus.register(CommandType, CommandHandlerType)when used with theContainerBuilderfromcommon/di.py.
- Handler Resolution: When a command is dispatched, the bus looks up the registered handler for that command's type.
castlecraft-engineer'sCommandBusresolves handlers from apunq.Container(the DI container). - Execution Flow: When
command_bus.execute(command)is called:- The bus resolves the handler for the type of
command. - It calls the handler's
authorize(command)method. - If authorization succeeds, it calls the handler's
execute(command)method. - It returns the result from the handler's
executemethod.
- The bus resolves the handler for the type of
- Handler Registration: Command Handlers are registered with the
4. CommandHandlerNotFoundError Exception¶
If the CommandBus receives a command for which no handler has been registered, it will raise a CommandHandlerNotFoundError. This exception is defined in castlecraft_engineer.abstractions.command_bus and re-exported in castlecraft_engineer.exc.
It's important to ensure all your commands have corresponding handlers registered with the bus.
5. Code Example: Wiring Up and Executing¶
This example shows how to set up the DI container, register the handler, and execute a command via the bus.
import asyncio
import uuid
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.command_bus import CommandBus
from castlecraft_engineer.authorization.default_services import AllowAllAuthorizationService # For example
from castlecraft_engineer.authorization.base_service import AuthorizationService
# Assuming RegisterUserCommand and RegisterUserHandler are defined as above
# from your_app.commands import RegisterUserCommand
# from your_app.handlers import RegisterUserHandler
async def main():
# 1. Configure DI Container
builder = ContainerBuilder()
# Register a dummy AsyncSession factory for this example if not using a real DB
# In a real app, builder.with_async_database() would be used.
class MockAsyncSession:
async def __aenter__(self): return self
async def __aexit__(self, exc_type, exc, tb): pass
async def commit(self): print("MockSession: commit called")
async def rollback(self): print("MockSession: rollback called")
builder.register(AuthorizationService, AllowAllAuthorizationService) # Example auth service
builder.register(MockAsyncSession) # Registering the mock session itself as the factory for AsyncSession type hint
# builder.register(UserRepository) # If UserRepository was used by the handler
# Register CommandBus and the handler
builder.with_command_bus()
# The CommandBus instance is available on the builder after with_command_bus()
# RegisterUserHandler depends on AsyncSession and AuthorizationService
builder.command_bus.register(RegisterUserCommand, RegisterUserHandler)
container = builder.build()
# 2. Resolve CommandBus
command_bus = container.resolve(CommandBus)
# 3. Create and Execute Command
user_id_to_create = uuid.uuid4()
cmd = RegisterUserCommand(
user_id=user_id_to_create,
username="testuser",
email="test@example.com",
password_hash="hashed_password_example"
)
try:
print(f"Dispatching command: {cmd}")
created_user_id = await command_bus.execute(cmd)
print(f"Command executed. Result (user_id): {created_user_id}")
assert created_user_id == user_id_to_create
except CommandHandlerNotFoundError:
print(f"Error: No handler found for {type(cmd).__name__}")
except AuthorizationError as e:
print(f"Authorization Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
if __name__ == "__main__":
# To run this example, ensure your_app.commands and your_app.handlers
# are replaced with actual paths or the classes are defined in this script.
# For simplicity, if you copy-paste, define RegisterUserCommand and RegisterUserHandler
# directly in this script before main().
asyncio.run(main())
This illustrates the complete flow from defining commands and handlers to dispatching them through the CommandBus with DI integration. The authorize method in the handler provides a hook for permission checks before execution.