Aggregates in CastleCraft Engineer¶
In Domain-Driven Design (DDD), an Aggregate is a cluster of domain objects (entities and value objects) that can be treated as a single unit. An Aggregate Root is the entry point to the Aggregate; all external references go to the Aggregate Root, and objects within the Aggregate boundary can only hold references to other Aggregate Roots.
castlecraft-engineer provides base classes and patterns to help you implement Aggregates effectively.
The Aggregate Base Class¶
The core abstraction is the castlecraft_engineer.abstractions.aggregate.Aggregate base class. When you define your domain aggregates, they should typically inherit from this class.
Key features provided by the Aggregate base class:
id: TID: A generic typeTIDrepresents the unique identifier for the aggregate (e.g.,uuid.UUID,int).version: int: An integer representing the version of the aggregate. This is crucial for implementing optimistic concurrency control. Each time an aggregate is modified and saved, its version should be incremented.uncommitted_events: List[Event]: A list to store domain events that have occurred within the aggregate but have not yet been persisted or published. This follows the pattern of collecting events as changes happen.record_event(event: Event): A method to add a domain event to theuncommitted_eventslist.pull_uncommitted_events() -> List[Event]: A method that returns all uncommitted events and clears the internal list. This is typically called after successfully persisting the aggregate's state.
# Example of a simple Aggregate definition
import uuid
from typing import List
from sqlmodel import SQLModel, Field # For the corresponding SQLModel
from castlecraft_engineer.abstractions.aggregate import Aggregate, Event
class OrderItemModel(SQLModel): # A model for a child entity, not an aggregate root itself
product_id: uuid.UUID
quantity: int
price: float
class OrderModel(SQLModel, table=True): # The SQLModel for the Order Aggregate Root
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
customer_id: uuid.UUID
# In a real system, order_items might be a JSONB field or a related table
# For simplicity, we'll assume it's handled by the aggregate's logic
version: int = Field(default=1)
class OrderPlaced(Event):
order_id: uuid.UUID
customer_id: uuid.UUID
class OrderLineAdded(Event):
order_id: uuid.UUID
product_id: uuid.UUID
quantity: int
class Order(Aggregate[uuid.UUID, OrderModel]):
def __init__(self, id: uuid.UUID, customer_id: uuid.UUID, version: int = 1):
super().__init__(id, version)
self.customer_id = customer_id
self._items: List[OrderItemModel] = [] # Internal state for order items
@classmethod
def place_order(cls, customer_id: uuid.UUID) -> "Order":
order_id = uuid.uuid4()
order = cls(id=order_id, customer_id=customer_id)
order.record_event(OrderPlaced(order_id=order.id, customer_id=order.customer_id))
return order
def add_item(self, product_id: uuid.UUID, quantity: int, price: float):
if quantity <= 0:
raise ValueError("Quantity must be positive")
# In a real system, check for duplicates, stock, etc.
self._items.append(OrderItemModel(product_id=product_id, quantity=quantity, price=price))
self.record_event(OrderLineAdded(order_id=self.id, product_id=product_id, quantity=quantity))
# Note: Modifying the aggregate should ideally lead to version increment when saved
# --- Persistence Mapping ---
def to_model(self) -> OrderModel:
# This method converts the aggregate to its persistable SQLModel form.
# How _items are persisted would depend on your strategy (e.g., JSONB, separate table).
# For this example, we'll assume OrderModel handles it or it's simplified.
return OrderModel(id=self.id, customer_id=self.customer_id, version=self.version)
@classmethod
def from_model(cls, model: OrderModel) -> "Order":
# This method reconstructs the aggregate from its SQLModel form.
# Loading _items would happen here.
order = cls(id=model.id, customer_id=model.customer_id, version=model.version)
# order._items = ... load items from DB based on model or related data
return order
Your aggregate root class must implement to_model() (to convert the aggregate to its corresponding SQLModel for saving) and from_model() (to reconstruct the aggregate from a SQLModel when loading).
Aggregate Repositories¶
Persistence of aggregates is handled by repositories. castlecraft-engineer provides generic repository implementations for aggregates:
AggregateRepository[TID, TAggregate, TModel]: For synchronous operations.AsyncAggregateRepository[TID, TAggregate, TModel]: For asynchronous operations (e.g., withAsyncSession).
These repositories are found in castlecraft_engineer.abstractions.aggregate.
Key Responsibilities and Features:¶
- Mapping to SQLModels: They use the
to_model()method of your aggregate to get theSQLModelrepresentation for saving and thefrom_model()method to reconstruct the aggregate when fetching. - Optimistic Concurrency Control: When saving an aggregate (
save()method), the repository checks the aggregate'sversionagainst the version in the database. If they don't match (meaning another process has modified the aggregate), anOptimisticConcurrencyErroris raised. Upon successful save, the aggregate's version in the database is incremented. Thesave()method also updates the in-memory aggregate's version to match the newly persisted version. - Session Management: The database session (e.g.,
SessionorAsyncSessionfrom SQLAlchemy/SQLModel) is external to the repository. You must pass the active session to the repository methods (get_by_id,save,delete). This promotes better transaction management at the application service or command handler level. - Basic CRUD Operations:
get_by_id(id: TID, session: TSession) -> Optional[TAggregate]: Fetches an aggregate by its ID.save(aggregate: TAggregate, session: TSession): Persists the aggregate. This handles both creation (insert) and updates. It also manages version incrementing for optimistic locking.delete(aggregate: TAggregate, session: TSession): Deletes an aggregate. (Note: Deleting aggregates often involves more complex domain logic, like archiving or marking as deleted, which should be handled within your aggregate or domain service).
Example: Instantiating and Using an AsyncAggregateRepository¶
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from castlecraft_engineer.abstractions.aggregate import AsyncAggregateRepository
# Assuming Order, OrderModel are defined as in the previous example
from your_module.models import Order, OrderModel # Replace your_module
# Concrete repository implementation for the Order aggregate
class OrderRepository(AsyncAggregateRepository[uuid.UUID, Order, OrderModel]):
pass # The base class provides the core logic
async def example_usage(session: AsyncSession, order_repo: OrderRepository):
# Create a new order
new_order = Order.place_order(customer_id=uuid.uuid4())
print(f"New order version before save: {new_order.version}") # Expected: 1
# Save the new order
await order_repo.save(new_order, session)
# After save, the repository updates the in-memory aggregate's version if it was an insert
# or if it was an update and the DB version was incremented.
print(f"New order version after save: {new_order.version}") # Expected: 1 (for new) or new_version (for update)
await session.commit() # Commit the transaction
# Fetch the order
fetched_order = await order_repo.get_by_id(new_order.id, session)
if fetched_order:
print(f"Fetched order: {fetched_order.id}, Version: {fetched_order.version}")
fetched_order.add_item(product_id=uuid.uuid4(), quantity=2, price=10.99)
# The add_item method should ideally not increment version directly;
# version is managed by the repository on save.
# Save the updated order
await order_repo.save(fetched_order, session)
print(f"Updated order version after save: {fetched_order.version}") # Expected: 2
await session.commit()
else:
print("Order not found after saving!")
Common Exceptions¶
When working with aggregates and their repositories, you might encounter these common exceptions (defined in castlecraft_engineer.abstractions.aggregate and re-exported in castlecraft_engineer.exc):
RepositoryError: A generic base class for repository-related issues.AggregateNotFoundError: Raised byget_by_idif an aggregate with the specified ID cannot be found. It can also be raised bysaveordeleteif the aggregate is expected to exist but doesn't (thoughsavetypically handles inserts gracefully).OptimisticConcurrencyError: Raised by thesavemethod if a version mismatch is detected, indicating that the aggregate has been modified by another process since it was last loaded.
Properly handling these exceptions is crucial for building robust applications.
Summary¶
Aggregates are a cornerstone of DDD, helping to maintain consistency and manage complexity. castlecraft-engineer provides the Aggregate base class and generic AggregateRepository / AsyncAggregateRepository implementations to streamline their use, including features like optimistic concurrency control and clear separation of persistence concerns.
Remember to manage your database sessions externally and pass them to repository methods.