Skip to content

Aggregates in CastleCraft Engineer

In Domain-Driven Design (DDD), an Aggregate is a cluster of domain objects (entities and value objects) that can be treated as a single unit. An Aggregate Root is the entry point to the Aggregate; all external references go to the Aggregate Root, and objects within the Aggregate boundary can only hold references to other Aggregate Roots.

castlecraft-engineer provides base classes and patterns to help you implement Aggregates effectively.

The Aggregate Base Class

The core abstraction is the castlecraft_engineer.abstractions.aggregate.Aggregate base class. When you define your domain aggregates, they should typically inherit from this class.

Key features provided by the Aggregate base class:

  • id: TID: A generic type TID represents the unique identifier for the aggregate (e.g., uuid.UUID, int).
  • version: int: An integer representing the version of the aggregate. This is crucial for implementing optimistic concurrency control. Each time an aggregate is modified and saved, its version should be incremented.
  • uncommitted_events: List[Event]: A list to store domain events that have occurred within the aggregate but have not yet been persisted or published. This follows the pattern of collecting events as changes happen.
  • record_event(event: Event): A method to add a domain event to the uncommitted_events list.
  • pull_uncommitted_events() -> List[Event]: A method that returns all uncommitted events and clears the internal list. This is typically called after successfully persisting the aggregate's state.
# Example of a simple Aggregate definition
import uuid
from typing import List
from sqlmodel import SQLModel, Field # For the corresponding SQLModel
from castlecraft_engineer.abstractions.aggregate import Aggregate, Event

class OrderItemModel(SQLModel): # A model for a child entity, not an aggregate root itself
    product_id: uuid.UUID
    quantity: int
    price: float

class OrderModel(SQLModel, table=True): # The SQLModel for the Order Aggregate Root
    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    customer_id: uuid.UUID
    # In a real system, order_items might be a JSONB field or a related table
    # For simplicity, we'll assume it's handled by the aggregate's logic
    version: int = Field(default=1)

class OrderPlaced(Event):
    order_id: uuid.UUID
    customer_id: uuid.UUID

class OrderLineAdded(Event):
    order_id: uuid.UUID
    product_id: uuid.UUID
    quantity: int

class Order(Aggregate[uuid.UUID, OrderModel]):
    def __init__(self, id: uuid.UUID, customer_id: uuid.UUID, version: int = 1):
        super().__init__(id, version)
        self.customer_id = customer_id
        self._items: List[OrderItemModel] = [] # Internal state for order items

    @classmethod
    def place_order(cls, customer_id: uuid.UUID) -> "Order":
        order_id = uuid.uuid4()
        order = cls(id=order_id, customer_id=customer_id)
        order.record_event(OrderPlaced(order_id=order.id, customer_id=order.customer_id))
        return order

    def add_item(self, product_id: uuid.UUID, quantity: int, price: float):
        if quantity <= 0:
            raise ValueError("Quantity must be positive")
        # In a real system, check for duplicates, stock, etc.
        self._items.append(OrderItemModel(product_id=product_id, quantity=quantity, price=price))
        self.record_event(OrderLineAdded(order_id=self.id, product_id=product_id, quantity=quantity))
        # Note: Modifying the aggregate should ideally lead to version increment when saved

    # --- Persistence Mapping ---
    def to_model(self) -> OrderModel:
        # This method converts the aggregate to its persistable SQLModel form.
        # How _items are persisted would depend on your strategy (e.g., JSONB, separate table).
        # For this example, we'll assume OrderModel handles it or it's simplified.
        return OrderModel(id=self.id, customer_id=self.customer_id, version=self.version)

    @classmethod
    def from_model(cls, model: OrderModel) -> "Order":
        # This method reconstructs the aggregate from its SQLModel form.
        # Loading _items would happen here.
        order = cls(id=model.id, customer_id=model.customer_id, version=model.version)
        # order._items = ... load items from DB based on model or related data
        return order

Your aggregate root class must implement to_model() (to convert the aggregate to its corresponding SQLModel for saving) and from_model() (to reconstruct the aggregate from a SQLModel when loading).

Aggregate Repositories

Persistence of aggregates is handled by repositories. castlecraft-engineer provides generic repository implementations for aggregates:

  • AggregateRepository[TID, TAggregate, TModel]: For synchronous operations.
  • AsyncAggregateRepository[TID, TAggregate, TModel]: For asynchronous operations (e.g., with AsyncSession).

These repositories are found in castlecraft_engineer.abstractions.aggregate.

Key Responsibilities and Features:

  1. Mapping to SQLModels: They use the to_model() method of your aggregate to get the SQLModel representation for saving and the from_model() method to reconstruct the aggregate when fetching.
  2. Optimistic Concurrency Control: When saving an aggregate (save() method), the repository checks the aggregate's version against the version in the database. If they don't match (meaning another process has modified the aggregate), an OptimisticConcurrencyError is raised. Upon successful save, the aggregate's version in the database is incremented. The save() method also updates the in-memory aggregate's version to match the newly persisted version.
  3. Session Management: The database session (e.g., Session or AsyncSession from SQLAlchemy/SQLModel) is external to the repository. You must pass the active session to the repository methods (get_by_id, save, delete). This promotes better transaction management at the application service or command handler level.
  4. Basic CRUD Operations:
    • get_by_id(id: TID, session: TSession) -> Optional[TAggregate]: Fetches an aggregate by its ID.
    • save(aggregate: TAggregate, session: TSession): Persists the aggregate. This handles both creation (insert) and updates. It also manages version incrementing for optimistic locking.
    • delete(aggregate: TAggregate, session: TSession): Deletes an aggregate. (Note: Deleting aggregates often involves more complex domain logic, like archiving or marking as deleted, which should be handled within your aggregate or domain service).

Example: Instantiating and Using an AsyncAggregateRepository

import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from castlecraft_engineer.abstractions.aggregate import AsyncAggregateRepository
# Assuming Order, OrderModel are defined as in the previous example
from your_module.models import Order, OrderModel # Replace your_module

# Concrete repository implementation for the Order aggregate
class OrderRepository(AsyncAggregateRepository[uuid.UUID, Order, OrderModel]):
    pass # The base class provides the core logic

async def example_usage(session: AsyncSession, order_repo: OrderRepository):
    # Create a new order
    new_order = Order.place_order(customer_id=uuid.uuid4())
    print(f"New order version before save: {new_order.version}") # Expected: 1

    # Save the new order
    await order_repo.save(new_order, session)
    # After save, the repository updates the in-memory aggregate's version if it was an insert
    # or if it was an update and the DB version was incremented.
    print(f"New order version after save: {new_order.version}") # Expected: 1 (for new) or new_version (for update)
    await session.commit() # Commit the transaction

    # Fetch the order
    fetched_order = await order_repo.get_by_id(new_order.id, session)
    if fetched_order:
        print(f"Fetched order: {fetched_order.id}, Version: {fetched_order.version}")
        fetched_order.add_item(product_id=uuid.uuid4(), quantity=2, price=10.99)
        # The add_item method should ideally not increment version directly;
        # version is managed by the repository on save.

        # Save the updated order
        await order_repo.save(fetched_order, session)
        print(f"Updated order version after save: {fetched_order.version}") # Expected: 2
        await session.commit()
    else:
        print("Order not found after saving!")
This repository would typically be instantiated and injected by your Dependency Injection container.

Common Exceptions

When working with aggregates and their repositories, you might encounter these common exceptions (defined in castlecraft_engineer.abstractions.aggregate and re-exported in castlecraft_engineer.exc):

  • RepositoryError: A generic base class for repository-related issues.
  • AggregateNotFoundError: Raised by get_by_id if an aggregate with the specified ID cannot be found. It can also be raised by save or delete if the aggregate is expected to exist but doesn't (though save typically handles inserts gracefully).
  • OptimisticConcurrencyError: Raised by the save method if a version mismatch is detected, indicating that the aggregate has been modified by another process since it was last loaded.

Properly handling these exceptions is crucial for building robust applications.

Summary

Aggregates are a cornerstone of DDD, helping to maintain consistency and manage complexity. castlecraft-engineer provides the Aggregate base class and generic AggregateRepository / AsyncAggregateRepository implementations to streamline their use, including features like optimistic concurrency control and clear separation of persistence concerns. Remember to manage your database sessions externally and pass them to repository methods.