Commands, Command Handlers, and Command Bus¶
In a CQRS (Command Query Responsibility Segregation) architecture, Commands represent an intent to change the state of the system. castlecraft-engineer
provides a structured way to define, handle, and dispatch these commands.
1. The Command
Base Class¶
All commands in your application should inherit from the Command
base class, located in castlecraft_engineer.abstractions.command.Command
.
- Purpose: A
Command
is a simple data structure (often a dataclass) that encapsulates all the information required to perform a specific action or state change. It's a message that expresses an intent. - Characteristics:
- They are typically named in an imperative mood (e.g.,
CreateUserCommand
,UpdateProductPriceCommand
). - They should be immutable once created (e.g., using
@dataclass(frozen=True)
). - They carry data but no behavior.
- They are typically named in an imperative mood (e.g.,
# Example: Defining a Command
from dataclasses import dataclass
import uuid
from castlecraft_engineer.abstractions.command import Command
@dataclass(frozen=True)
class RegisterUserCommand(Command):
# user_id is typically generated by the system upon creation, not passed in the command.
username: str
email: str
password_hash: str # Password should be pre-hashed by the caller or a dedicated service
creator_id: uuid.UUID # Example: ID of the user initiating the command
2. The CommandHandler
Base Class¶
For every command, there should be a corresponding CommandHandler
. Command Handlers contain the actual logic to process a command. They inherit from castlecraft_engineer.abstractions.command_handler.CommandHandler[TCommand]
(assuming the base class is generic over the command type and its execute
method returns Any
or None
).
- Purpose: To encapsulate the business logic required to execute a specific command. This often involves interacting with domain models (Aggregates), repositories, and other services.
- Key Methods:
execute(self, command: TCommand) -> Any
(orasync execute
): This is the main method where the command's logic is implemented. It receives the command object and performs the necessary actions. The return type isAny
by default in the base class, allowing handlers to return specific results (like an ID) orNone
.authorize(self, command: TCommand) -> bool
(orasync authorize
): This method is called by theCommandBus
beforeexecute
. It's responsible for checking if the current context (e.g., user, permissions) is authorized to execute the given command. If authorization fails, it should raise anAuthorizationError
. Implementations should returnTrue
if authorized.
# Example: Defining a CommandHandler
import uuid
from typing import Any # For execute return type if needed
from sqlalchemy.ext.asyncio import AsyncSession # Assuming async operation
from castlecraft_engineer.abstractions.command_handler import CommandHandler
# from castlecraft_engineer.abstractions.aggregate import AsyncAggregateRepository # Example
from castlecraft_engineer.authorization.base_service import AuthorizationService # Example
from castlecraft_engineer.exc import AuthorizationError # Standard error
# Assuming RegisterUserCommand is defined as above
# Assuming User and UserModel are defined domain/SQL models
# from your_domain.user import User, UserModel
# class UserRepository(AsyncAggregateRepository[uuid.UUID, User, UserModel]):
# def __init__(self):
# super().__init__(aggregate_cls=User, model_cls=UserModel)
# Corrected CommandHandler signature
class RegisterUserHandler(CommandHandler[RegisterUserCommand]):
def __init__(self, session: AsyncSession, auth_service: AuthorizationService): #, user_repo: UserRepository):
self.session = session
# self.user_repo = user_repo
self.auth_service = auth_service # Example for authorization
async def authorize(self, command: RegisterUserCommand) -> bool:
# Example: Check if the current context allows user registration.
# This is a simplified check. Real checks might involve user roles, permissions, etc.
# await self.auth_service.check_permission(subject_id=str(command.creator_id), required_permissions=[...])
print(f"Authorizing RegisterUserCommand for user: {command.username} by {command.creator_id}")
# If not authorized, raise: raise AuthorizationError("Not authorized to register user.")
return True # For this example, always allow
async def execute(self, command: RegisterUserCommand) -> uuid.UUID: # Example: returning the new user's ID
print(f"Executing RegisterUserCommand for: {command.username}, {command.email} by {command.creator_id}")
new_user_id = uuid.uuid4() # ID is generated here
# 1. Create a new User aggregate (example)
# user = User.create(id=new_user_id, username=command.username, email=command.email, password_hash=command.password_hash, creator_id=command.creator_id)
# 2. Save the aggregate using the repository
# await self.user_repo.save(user, self.session)
# 3. Commit the session (often handled by a Unit of Work or middleware)
# await self.session.commit()
# 4. Optionally, publish events (also often handled by UoW)
# for event in user.pull_uncommitted_events():
# await self.event_publisher.publish(event)
print(f"User {command.username} (ID: {new_user_id}) would be created here.")
return new_user_id # Return the ID of the newly created user
3. The CommandBus
¶
The CommandBus
(from castlecraft_engineer.abstractions.command_bus.CommandBus
) acts as a central dispatcher for commands. It decouples the part of your application that issues a command (e.g., an API endpoint) from the handler that processes it.
- Purpose: To route commands to their appropriate handlers.
- Key Functionalities:
- Handler Registration: Command Handlers are registered with the
CommandBus
for specificCommand
types. This is often done at application startup, typically using a Dependency Injection (DI) container.castlecraft-engineer
'sCommandBus
(when used withContainerBuilder
) allows registration viabuilder.command_bus.register(CommandType, CommandHandlerType)
.
- Handler Resolution: When a command is dispatched, the bus looks up the registered handler for that command's type.
castlecraft-engineer
'sCommandBus
resolves handlers from apunq.Container
(the DI container). - Execution Flow: When
command_bus.execute(command, **kwargs)
is called (wherekwargs
can includesubject_id
,permissions
for authorization):- The bus resolves the handler for the type of
command
. - It calls the handler's
authorize(command, **kwargs)
method (passing through relevant kwargs). - If authorization succeeds (returns
True
), it calls the handler'sexecute(command, **kwargs)
method (passing through relevant kwargs). - It returns the result from the handler's
execute
method.
- The bus resolves the handler for the type of
- Handler Registration: Command Handlers are registered with the
4. CommandHandlerNotFoundError
Exception¶
If the CommandBus
receives a command for which no handler has been registered, it will raise a CommandHandlerNotFoundError
. This exception is defined in castlecraft_engineer.abstractions.command_bus
and re-exported in castlecraft_engineer.exc
.
It's important to ensure all your commands have corresponding handlers registered with the bus.
5. Code Example: Wiring Up and Executing¶
This example shows how to set up the DI container, register the handler, and execute a command via the bus.
import asyncio
import uuid
from sqlalchemy.ext.asyncio import AsyncSession # For type hinting
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.command_bus import CommandBus
from castlecraft_engineer.authorization.default_services import AllowAllAuthorizationService # For example
from castlecraft_engineer.authorization.base_service import AuthorizationService
from castlecraft_engineer.exc import CommandHandlerNotFoundError, AuthorizationError
# Assuming RegisterUserCommand and RegisterUserHandler are defined as in the examples above
# from your_app.commands import RegisterUserCommand # Defined above for this script
# from your_app.handlers import RegisterUserHandler # Defined above for this script
async def main():
# 1. Configure DI Container
builder = ContainerBuilder()
# Register a dummy AsyncSession for this example if not using a real DB
# In a real app, builder.with_async_database() would be used.
class MockAsyncSession:
async def commit(self): print("MockSession: commit called")
async def rollback(self): print("MockSession: rollback called")
# Add other methods your handler might expect from AsyncSession if any
builder.register(AuthorizationService, AllowAllAuthorizationService) # Example auth service
builder.register(AsyncSession, instance=MockAsyncSession()) # Register an instance of the mock
# builder.register(UserRepository) # If UserRepository was used by the handler
# Register CommandBus and the handler
builder.with_command_bus()
# The CommandBus instance is available on the builder after with_command_bus()
# RegisterUserHandler depends on AsyncSession and AuthorizationService
builder.command_bus.register(RegisterUserCommand, RegisterUserHandler)
container = builder.build()
# 2. Resolve CommandBus
command_bus = container.resolve(CommandBus)
# 3. Create and Execute Command
initiating_user_id = uuid.uuid4() # Example ID of the user performing the action
cmd = RegisterUserCommand(
username="testuser",
email="test@example.com",
password_hash="hashed_password_example",
creator_id=initiating_user_id
)
try:
print(f"Dispatching command: {cmd}")
# Pass subject_id for authorization context if needed by your auth service
created_user_id = await command_bus.execute(cmd, subject_id=str(initiating_user_id))
print(f"Command executed. Result (new user_id): {created_user_id}")
except CommandHandlerNotFoundError:
print(f"Error: No handler found for {type(cmd).__name__}")
except AuthorizationError as e:
print(f"Authorization Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
if __name__ == "__main__":
# To run this example, ensure RegisterUserCommand and RegisterUserHandler
# are defined in this script (as they are in the example snippets).
asyncio.run(main())
This illustrates the complete flow from defining commands and handlers to dispatching them through the CommandBus
with DI integration. The authorize
method in the handler provides a hook for permission checks before execution.