Skip to content

Aggregates in Castlecraft Engineer

In Domain-Driven Design (DDD), an Aggregate is a cluster of domain objects (entities and value objects) that can be treated as a single unit. An Aggregate Root is the entry point to the Aggregate; all external references go to the Aggregate Root, and objects within the Aggregate boundary can only hold references to other Aggregate Roots.

castlecraft-engineer provides base classes and patterns to help you implement Aggregates effectively.

The Aggregate Base Class

The core abstraction is the castlecraft_engineer.abstractions.aggregate.Aggregate[TModel] base class. When you define your domain aggregates, they should typically inherit from this class, providing their corresponding persistence model type as TModel.

Key features provided by the Aggregate base class:

  • id: uuid.UUID: A unique identifier for the aggregate, typically a UUID.
  • version: int: An integer representing the version of the aggregate. This is crucial for implementing optimistic concurrency control. Each time an aggregate is modified and saved, its version should be incremented.
  • uncommitted_events: List[Event]: A list to store domain events that have occurred within the aggregate but have not yet been persisted or published. This follows the pattern of collecting events as changes happen.
  • _record_event(event: Event): A protected method to add a domain event to the uncommitted_events list. (Note: Your RevisionDraftAggregate example used _record_event. The base class might expose a public record_event or a protected _record_event. This example will use _record_event for consistency with your provided aggregate, assuming it's the intended pattern for subclasses to use).
  • pull_uncommitted_events() -> List[Event]: A method that returns all uncommitted events and clears the internal list. This is typically called after successfully persisting the aggregate's state.

The base Aggregate class is initialized with id and version (e.g., super().__init__(id=id, version=version)).

# Example of a simple Aggregate definition
import uuid
from dataclasses import dataclass
from typing import List
from sqlmodel import SQLModel, Field # For the corresponding SQLModel
from castlecraft_engineer.abstractions.aggregate import Aggregate
from castlecraft_engineer.abstractions.event import Event # Base Event class

class OrderItemModel(SQLModel): # A model for a child entity/value object, not an aggregate root itself
    product_id: uuid.UUID
    quantity: int
    price: float

class OrderModel(SQLModel, table=True): # The SQLModel for the Order Aggregate Root
    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    customer_id: uuid.UUID
    # In a real system, order_items might be a JSONB field or a related table.
    # For simplicity in to_model/from_model, we'll focus on the aggregate's direct state.
    version: int = Field(default=1)

@dataclass(frozen=True)
class OrderPlaced(Event):
    order_id: uuid.UUID
    customer_id: uuid.UUID

@dataclass(frozen=True)
class OrderLineAdded(Event):
    order_id: uuid.UUID
    product_id: uuid.UUID
    quantity: int

class Order(Aggregate[OrderModel]):
    customer_id: uuid.UUID
    _items: List[OrderItemModel] # Internal state for order items

    def __init__(self, id: uuid.UUID, customer_id: uuid.UUID, version: int = 1):
        super().__init__(id=id, version=version)
        self.customer_id = customer_id
        self._items = []

    @classmethod
    def place_order(cls, customer_id: uuid.UUID, creator_id: uuid.UUID) -> "Order": # Added creator_id for context",
        order_id = uuid.uuid4()
        # Initial version is 1 for a new aggregate
        order = cls(id=order_id, customer_id=customer_id, version=1)
        # Use _record_event as per RevisionDraftAggregate example
        order._record_event(OrderPlaced(order_id=order.id, customer_id=order.customer_id))
        return order

    def add_item(self, product_id: uuid.UUID, quantity: int, price: float):
        if quantity <= 0:
            raise ValueError("Quantity must be positive")
        # In a real system, check for duplicates, stock, business rules etc.
        self._items.append(OrderItemModel(product_id=product_id, quantity=quantity, price=price))
        self._record_event(OrderLineAdded(order_id=self.id, product_id=product_id, quantity=quantity))
        # Note: Modifying the aggregate should ideally lead to version increment when saved by the repository.

    # --- Persistence Mapping ---
    def to_model(self) -> OrderModel:
        """Converts the aggregate to its persistable SQLModel form."""
        # How _items are persisted would depend on your strategy (e.g., JSONB, separate table).
        # This example simplifies by not showing item persistence in the model directly.
        return OrderModel(id=self.id, customer_id=self.customer_id, version=self.version)

    @classmethod
    def from_model(cls, model: OrderModel) -> "Order":
        """Reconstructs the aggregate from its SQLModel form."""
        # Loading _items would happen here if they are persisted.
        order = cls(id=model.id, customer_id=model.customer_id, version=model.version)
        # Example: order._items = load_items_for_order(model.id) # Pseudocode
        return order

Your aggregate root class must implement to_model() (to convert the aggregate to its corresponding SQLModel for saving) and from_model() (to reconstruct the aggregate from a SQLModel when loading).

Aggregate Repositories

Persistence of aggregates is handled by repositories. castlecraft-engineer provides generic repository implementations for aggregates:

  • AggregateRepository[TID, TAggregate, TModel]: For synchronous operations.
  • AsyncAggregateRepository[TID, TAggregate, TModel]: For asynchronous operations (e.g., with AsyncSession).

These repositories are typically found in castlecraft_engineer.abstractions.repository (as per your RevisionDraftAggregateDomainRepository example). The generic arguments are: * TID: The type of the aggregate's identifier (e.g., uuid.UUID). * TAggregate: The type of the domain aggregate (e.g., Order). * TModel: The type of the persistence model (e.g., OrderModel).

Key Responsibilities and Features:

  1. Initialization: Concrete repositories must initialize the base repository by passing aggregate_cls and model_cls to super().__init__().
  2. Mapping to SQLModels: They use the to_model() method of your aggregate to get the SQLModel representation for saving and the from_model() method to reconstruct the aggregate when fetching.
  3. Optimistic Concurrency Control: When saving an aggregate (save() method), the repository checks the aggregate's version against the version in the database. If they don't match (meaning another process has modified the aggregate), an OptimisticConcurrencyError is raised. Upon successful save, the aggregate's version in the database is incremented. The save() method also updates the in-memory aggregate's version to match the newly persisted version.
  4. Session Management: The database session (e.g., Session or AsyncSession from SQLAlchemy/SQLModel) is external to the repository. You must pass the active session to the repository methods (get_by_id, save, delete). This promotes better transaction management at the application service or command handler level.
  5. Basic CRUD Operations:
    • get_by_id(id: TID, session: TSession) -> Optional[TAggregate]: Fetches an aggregate by its ID.
    • save(aggregate: TAggregate, session: TSession): Persists the aggregate. This handles both creation (insert) and updates. It also manages version incrementing for optimistic locking.
    • delete(aggregate: TAggregate, session: TSession): Deletes an aggregate. (Note: Deleting aggregates often involves more complex domain logic, like archiving or marking as deleted, which should be handled within your aggregate or domain service).

Example: Instantiating and Using an AsyncAggregateRepository

import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from castlecraft_engineer.abstractions.repository import AsyncAggregateRepository
# Assuming Order, OrderModel are defined as in the previous example
from your_module.models import Order, OrderModel # Replace your_module

# Concrete repository implementation for the Order aggregate
class OrderRepository(AsyncAggregateRepository[uuid.UUID, Order, OrderModel]):
    def __init__(self):
        super().__init__(
            aggregate_cls=Order,
            model_cls=OrderModel,
        )

async def example_usage(session: AsyncSession, order_repo: OrderRepository):
    # Create a new order
    new_order = Order.place_order(customer_id=uuid.uuid4(), creator_id=uuid.uuid4()) # Added creator_id
    print(f"New order version before save: {new_order.version}") # Expected: 1

    # Save the new order
    await order_repo.save(new_order, session)
    # After save, the repository updates the in-memory aggregate's version.
    print(f"New order version after save: {new_order.version}") # Expected: 1 (for new) or new_version (for update)
    await session.commit() # Commit the transaction

    # Fetch the order
    fetched_order = await order_repo.get_by_id(new_order.id, session)
    if fetched_order:
        print(f"Fetched order: {fetched_order.id}, Version: {fetched_order.version}")
        fetched_order.add_item(product_id=uuid.uuid4(), quantity=2, price=10.99)
        # The add_item method records an event. Version is managed by the repository on save.

        # Save the updated order
        await order_repo.save(fetched_order, session)
        print(f"Updated order version after save: {fetched_order.version}") # Expected: 2 (or higher if other changes occurred)
        await session.commit()
    else:
        print("Order not found after saving!")
This repository would typically be instantiated and injected by your Dependency Injection container.

Common Exceptions

When working with aggregates and their repositories, you might encounter these common exceptions (defined in castlecraft_engineer.exc or directly in castlecraft_engineer.abstractions.repository and castlecraft_engineer.abstractions.aggregate):

  • RepositoryError: A generic base class for repository-related issues.
  • AggregateNotFoundError: Raised by get_by_id if an aggregate with the specified ID cannot be found. It can also be raised by save or delete if the aggregate is expected to exist but doesn't (though save typically handles inserts gracefully).
  • OptimisticConcurrencyError: Raised by the save method if a version mismatch is detected, indicating that the aggregate has been modified by another process since it was last loaded.

Properly handling these exceptions is crucial for building robust applications.

Summary

Aggregates are a cornerstone of DDD, helping to maintain consistency and manage complexity. castlecraft-engineer provides the Aggregate[TModel] base class and generic AggregateRepository / AsyncAggregateRepository implementations to streamline their use, including features like optimistic concurrency control and clear separation of persistence concerns. Remember to manage your database sessions externally and pass them to repository methods, and ensure your concrete repositories correctly initialize the base class with aggregate_cls and model_cls.