Skip to content

Unit of Work (UoW) Pattern

The Unit of Work (UoW) pattern is a design pattern that tracks changes to objects during a business transaction and coordinates the writing of these changes to a data store (like a database) atomically. It ensures that all operations within a single logical transaction are either all successfully completed (committed) or all rolled back in case of an error, maintaining data consistency.

UoW Implementation in This Library

In this library, when using SQLAlchemy or SQLModel for persistence, the Session object serves as the concrete implementation of the Unit of Work. The session tracks changes to ORM-mapped objects and handles the transaction lifecycle (commit, rollback).

The AggregateRepository and AsyncAggregateRepository are designed to operate within an existing Unit of Work. Their methods (get_by_id, save, delete_by_id) require a session object to be passed to them. They use this session to interact with the database but do not manage the session's lifecycle (i.e., they don't create, commit, or rollback the session themselves).

Leveraging Execution Context for Handlers

The execute methods of CommandHandler and QueryHandler are defined with *args and **kwargs:

async def execute(
    self,
    command: TCommand, # or query: TQuery
    *args,
    subject_id: Optional[str] = None, # Explicit for user/subject
    permissions: List[Permission] = [], # Explicit for authorization
    **kwargs, # Flexible execution context
) -> Any:
    # ... implementation ...

The **kwargs parameter provides a flexible mechanism to pass additional runtime information, collectively referred to as the "Execution Context", to the handler. This context can include various pieces of data or dependencies needed by the handler beyond the core command/query data.

The CommandBus and QueryBus forward these *args and **kwargs to the handler's execute method. This allows the layer calling the bus (e.g., an application service, API endpoint, or a dedicated UoW manager) to inject necessary contextual information.

Key Uses of Execution Context:

  1. Passing the Unit of Work (Database Session): As detailed below, the most common use is to pass the database session object, allowing handlers to perform operations within a managed transaction.

    # Example: Calling the bus with a session
    async with session_maker() as session:
        await command_bus.execute(my_command, session=session, subject_id='user-123')
        # The handler for my_command will receive 'session' and 'subject_id' in its **kwargs
    

  2. Correlation ID / Trace ID: For distributed tracing and cohesive logging across different services or components involved in a single request. This helps in debugging and monitoring complex flows.

    # Example: Passing a correlation ID
    correlation_id = "some-unique-trace-id"
    await command_bus.execute(my_command, session=s, correlation_id=correlation_id)
    

  3. Tenant ID (for Multi-Tenant Applications): In multi-tenant systems, the tenant_id can be passed to ensure that operations and data access are correctly scoped to the current tenant. This is crucial for data isolation and security.

    # Example: Passing a tenant ID
    current_tenant_id = "tenant-abc"
    await query_bus.execute(my_query, session=s, tenant_id=current_tenant_id)
    

  4. Feature Flags / Toggles: To dynamically alter handler behavior based on runtime feature flag configurations. This allows for gradual rollouts, A/B testing, or conditional logic without code redeployments.

    # Example: Passing feature flag information
    feature_flags = {"new_inventory_check": True, "use_updated_pricing": False}
    await command_bus.execute(update_product_command, session=s, features=feature_flags)
    

  5. Client Information (User Agent, IP Address): If handlers need to adapt their logic, perform specific logging, or apply rate limiting based on the client making the request (e.g., web browser, mobile app, API client).

    # Example: Passing client details
    client_info = {"user_agent": "WebApp/1.0", "ip_address": "192.168.1.100"}
    await command_bus.execute(login_command, session=s, client=client_info)
    

  6. Internationalization/Localization (i18n/l10n) Context: To pass the current locale or language preference (e.g., Accept-Language header value). Handlers can use this to return localized messages or interact with localized data.

    # Example: Passing locale information
    locale = "en-US"
    await query_bus.execute(get_product_details_query, session=s, locale=locale)
    

  7. API Version: If your application supports multiple API versions, handlers might need to know which version is being invoked to adapt their response format or behavior accordingly.

    # Example: Passing API version
    api_version = "v2"
    await command_bus.execute(create_resource_command, session=s, api_version=api_version)
    

This design keeps Command/Query objects focused on their specific intent and data, while the execution context provides a clean way for handlers to access broader, request-specific or environmental information.

Patterns for Managing the UoW Lifecycle

The responsibility for creating the session, passing it to the relevant components (like repositories via handlers using the execution context), and managing its lifecycle (commit/rollback) lies outside the repositories and often outside the handlers themselves. Here are common patterns:

This is the most common and generally recommended approach when working with Command and Query buses. The UoW is managed by a component that wraps the bus execution.

a. Decorator or Context Manager Pattern

A decorator or context manager can encapsulate the UoW logic, ensuring that a session is created, made available (e.g., via execution context), and then committed or rolled back.

# Conceptual Example: UoW Context Manager
class UnitOfWorkManager:
    def __init__(self, session_maker):
        self._session_maker = session_maker
        self.session = None

    async def __aenter__(self):
        self.session = self._session_maker()
        # Could also set other context items here, e.g., from a request object
        return self # Or just the session, or a dict of context items

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            try:
                if exc_type:
                    await self.session.rollback()
                else:
                    await self.session.commit()
            finally:
                await self.session.close()

# Usage:
# uow_manager = UnitOfWorkManager(async_session_maker)
# async with uow_manager as uow_ctx:
#     await command_bus.execute(some_command, session=uow_ctx.session, correlation_id=uow_ctx.correlation_id)
This pattern cleanly separates UoW management from the application logic.

b. Application Service Layer Pattern

An application service method can orchestrate the UoW and other context. The service method creates the session, gathers other contextual data, calls one or more bus methods (passing the session and context), and then commits or rolls back.

# Conceptual Example: Service managing UoW and context
class OrderService:
    def __init__(self, command_bus, session_maker):
        self._command_bus = command_bus
        self._session_maker = session_maker

    async def place_order(self, order_data, user_id, trace_id, client_ip):
        async with self._session_maker() as session:
            try:
                create_order_command = CreateOrderCommand(**order_data)
                order_id = await self._command_bus.execute(
                    create_order_command,
                    session=session,
                    subject_id=user_id,
                    correlation_id=trace_id,
                    client_ip=client_ip
                )
                # ... potentially other commands or logic within the same UoW ...
                await session.commit()
                return order_id
            except Exception:
                await session.rollback()
                raise

c. Middleware Pattern (e.g., for Web APIs)

In web application frameworks (like FastAPI, Starlette), middleware can manage the UoW lifecycle and gather other contextual data per HTTP request. The middleware creates a session, extracts headers or request info, makes them available (e.g., via request state or by passing them directly when calling services/bus), and commits/rolls back at the end of the request.

# Conceptual FastAPI Middleware Example
# @app.middleware("http")
# async def db_session_middleware(request: Request, call_next):
#     async with async_session_maker() as session:
#         request.state.db_session = session
#         request.state.correlation_id = request.headers.get("X-Correlation-ID")
#         try:
#             response = await call_next(request)
#             # In a real app, the endpoint would call a service,
#             # which then calls the bus, passing session and correlation_id from request.state
#             await session.commit() # Commit might be handled by the service layer or UoW manager
#         except Exception:
#             await session.rollback() # Rollback might be handled by the service layer or UoW manager
#             raise
#         finally:
#             await session.close()
#     return response

2. Handler-Managed Unit of Work (Using SessionMaker)

In some scenarios, a handler itself might need to manage the UoW. This is achieved by injecting a sessionmaker (e.g., SQLAlchemy's async_sessionmaker or sessionmaker) into the handler. The handler then creates a session, uses it for its operations (including calls to repositories), and manages the commit/rollback.

This pattern can be useful for: - Complex handlers that perform multiple related database operations. - Background tasks or event handlers that are not part of a request-response cycle managed by external UoW. - When a specific operation needs its own isolated transaction, independent of a broader UoW.

class CreateItemCommandHandler(CommandHandler[CreateItemCommand, ItemId]):
    def __init__(self, repository: ItemRepository, session_maker: async_sessionmaker):
        self._repository = repository
        self._session_maker = session_maker
        self._logger = logging.getLogger(__name__)

    async def execute(self, command: CreateItemCommand, **kwargs) -> ItemId:
        self._logger.info(f"Executing CreateItemCommand: {command}")
        item_aggregate = ItemAggregate.create(name=command.name, description=command.description)
        # Handler could also access other context from kwargs if needed:
        # correlation_id = kwargs.get('correlation_id')

        async with self._session_maker() as session: # Handler creates and manages session
            try:
                await self._repository.save(session=session, aggregate=item_aggregate)
                # ... potentially other operations using this session ...
                await session.commit()
                self._logger.info(f"Item {item_aggregate.id} created and session committed.")
                return item_aggregate.id
            except Exception as e:
                self._logger.error(f"Error creating item: {e}", exc_info=True)
                await session.rollback()
                raise

Considerations for Handler-Managed UoW: - Increased Handler Complexity: Handlers become responsible for transaction management, which can mix business logic with infrastructure concerns. - Testing: May require more setup to mock the sessionmaker and session behavior. - Composition: If a handler managing its own UoW is called by another service that also manages a UoW, you might end up with nested transactions or unexpected behavior if not handled carefully (though SQLAlchemy sessions can often handle nesting gracefully).

It's generally preferred to keep UoW management external to handlers for better separation of concerns, but this pattern provides flexibility when needed.

Benefits of the Unit of Work Pattern

  • Atomicity: Ensures that a group of operations are treated as a single, indivisible unit.
  • Consistency: Helps maintain data integrity by committing changes only if all operations succeed.
  • Reduced Database Chattiness: Changes can be batched and sent to the database in a more optimized way (often handled by the ORM session).
  • Clear Transaction Boundaries: Makes the scope of a business transaction explicit.