Aggregates in Castlecraft Engineer¶
In Domain-Driven Design (DDD), an Aggregate is a cluster of domain objects (entities and value objects) that can be treated as a single unit. An Aggregate Root is the entry point to the Aggregate; all external references go to the Aggregate Root, and objects within the Aggregate boundary can only hold references to other Aggregate Roots.
castlecraft-engineer
provides base classes and patterns to help you implement Aggregates effectively.
The Aggregate
Base Class¶
The core abstraction is the castlecraft_engineer.abstractions.aggregate.Aggregate[TModel]
base class. When you define your domain aggregates, they should typically inherit from this class, providing their corresponding persistence model type as TModel
.
Key features provided by the Aggregate
base class:
id: uuid.UUID
: A unique identifier for the aggregate, typically a UUID.version: int
: An integer representing the version of the aggregate. This is crucial for implementing optimistic concurrency control. Each time an aggregate is modified and saved, its version should be incremented.uncommitted_events: List[Event]
: A list to store domain events that have occurred within the aggregate but have not yet been persisted or published. This follows the pattern of collecting events as changes happen._record_event(event: Event)
: A protected method to add a domain event to theuncommitted_events
list. (Note: YourRevisionDraftAggregate
example used_record_event
. The base class might expose a publicrecord_event
or a protected_record_event
. This example will use_record_event
for consistency with your provided aggregate, assuming it's the intended pattern for subclasses to use).pull_uncommitted_events() -> List[Event]
: A method that returns all uncommitted events and clears the internal list. This is typically called after successfully persisting the aggregate's state.
The base Aggregate
class is initialized with id
and version
(e.g., super().__init__(id=id, version=version)
).
# Example of a simple Aggregate definition
import uuid
from dataclasses import dataclass
from typing import List
from sqlmodel import SQLModel, Field # For the corresponding SQLModel
from castlecraft_engineer.abstractions.aggregate import Aggregate
from castlecraft_engineer.abstractions.event import Event # Base Event class
class OrderItemModel(SQLModel): # A model for a child entity/value object, not an aggregate root itself
product_id: uuid.UUID
quantity: int
price: float
class OrderModel(SQLModel, table=True): # The SQLModel for the Order Aggregate Root
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
customer_id: uuid.UUID
# In a real system, order_items might be a JSONB field or a related table.
# For simplicity in to_model/from_model, we'll focus on the aggregate's direct state.
version: int = Field(default=1)
@dataclass(frozen=True)
class OrderPlaced(Event):
order_id: uuid.UUID
customer_id: uuid.UUID
@dataclass(frozen=True)
class OrderLineAdded(Event):
order_id: uuid.UUID
product_id: uuid.UUID
quantity: int
class Order(Aggregate[OrderModel]):
customer_id: uuid.UUID
_items: List[OrderItemModel] # Internal state for order items
def __init__(self, id: uuid.UUID, customer_id: uuid.UUID, version: int = 1):
super().__init__(id=id, version=version)
self.customer_id = customer_id
self._items = []
@classmethod
def place_order(cls, customer_id: uuid.UUID, creator_id: uuid.UUID) -> "Order": # Added creator_id for context",
order_id = uuid.uuid4()
# Initial version is 1 for a new aggregate
order = cls(id=order_id, customer_id=customer_id, version=1)
# Use _record_event as per RevisionDraftAggregate example
order._record_event(OrderPlaced(order_id=order.id, customer_id=order.customer_id))
return order
def add_item(self, product_id: uuid.UUID, quantity: int, price: float):
if quantity <= 0:
raise ValueError("Quantity must be positive")
# In a real system, check for duplicates, stock, business rules etc.
self._items.append(OrderItemModel(product_id=product_id, quantity=quantity, price=price))
self._record_event(OrderLineAdded(order_id=self.id, product_id=product_id, quantity=quantity))
# Note: Modifying the aggregate should ideally lead to version increment when saved by the repository.
# --- Persistence Mapping ---
def to_model(self) -> OrderModel:
"""Converts the aggregate to its persistable SQLModel form."""
# How _items are persisted would depend on your strategy (e.g., JSONB, separate table).
# This example simplifies by not showing item persistence in the model directly.
return OrderModel(id=self.id, customer_id=self.customer_id, version=self.version)
@classmethod
def from_model(cls, model: OrderModel) -> "Order":
"""Reconstructs the aggregate from its SQLModel form."""
# Loading _items would happen here if they are persisted.
order = cls(id=model.id, customer_id=model.customer_id, version=model.version)
# Example: order._items = load_items_for_order(model.id) # Pseudocode
return order
Your aggregate root class must implement to_model()
(to convert the aggregate to its corresponding SQLModel for saving) and from_model()
(to reconstruct the aggregate from a SQLModel when loading).
Aggregate Repositories¶
Persistence of aggregates is handled by repositories. castlecraft-engineer
provides generic repository implementations for aggregates:
AggregateRepository[TID, TAggregate, TModel]
: For synchronous operations.AsyncAggregateRepository[TID, TAggregate, TModel]
: For asynchronous operations (e.g., withAsyncSession
).
These repositories are typically found in castlecraft_engineer.abstractions.repository
(as per your RevisionDraftAggregateDomainRepository
example).
The generic arguments are:
* TID
: The type of the aggregate's identifier (e.g., uuid.UUID
).
* TAggregate
: The type of the domain aggregate (e.g., Order
).
* TModel
: The type of the persistence model (e.g., OrderModel
).
Key Responsibilities and Features:¶
- Initialization: Concrete repositories must initialize the base repository by passing
aggregate_cls
andmodel_cls
tosuper().__init__()
. - Mapping to SQLModels: They use the
to_model()
method of your aggregate to get theSQLModel
representation for saving and thefrom_model()
method to reconstruct the aggregate when fetching. - Optimistic Concurrency Control: When saving an aggregate (
save()
method), the repository checks the aggregate'sversion
against the version in the database. If they don't match (meaning another process has modified the aggregate), anOptimisticConcurrencyError
is raised. Upon successful save, the aggregate's version in the database is incremented. Thesave()
method also updates the in-memory aggregate's version to match the newly persisted version. - Session Management: The database session (e.g.,
Session
orAsyncSession
from SQLAlchemy/SQLModel) is external to the repository. You must pass the active session to the repository methods (get_by_id
,save
,delete
). This promotes better transaction management at the application service or command handler level. - Basic CRUD Operations:
get_by_id(id: TID, session: TSession) -> Optional[TAggregate]
: Fetches an aggregate by its ID.save(aggregate: TAggregate, session: TSession)
: Persists the aggregate. This handles both creation (insert) and updates. It also manages version incrementing for optimistic locking.delete(aggregate: TAggregate, session: TSession)
: Deletes an aggregate. (Note: Deleting aggregates often involves more complex domain logic, like archiving or marking as deleted, which should be handled within your aggregate or domain service).
Example: Instantiating and Using an AsyncAggregateRepository
¶
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from castlecraft_engineer.abstractions.repository import AsyncAggregateRepository
# Assuming Order, OrderModel are defined as in the previous example
from your_module.models import Order, OrderModel # Replace your_module
# Concrete repository implementation for the Order aggregate
class OrderRepository(AsyncAggregateRepository[uuid.UUID, Order, OrderModel]):
def __init__(self):
super().__init__(
aggregate_cls=Order,
model_cls=OrderModel,
)
async def example_usage(session: AsyncSession, order_repo: OrderRepository):
# Create a new order
new_order = Order.place_order(customer_id=uuid.uuid4(), creator_id=uuid.uuid4()) # Added creator_id
print(f"New order version before save: {new_order.version}") # Expected: 1
# Save the new order
await order_repo.save(new_order, session)
# After save, the repository updates the in-memory aggregate's version.
print(f"New order version after save: {new_order.version}") # Expected: 1 (for new) or new_version (for update)
await session.commit() # Commit the transaction
# Fetch the order
fetched_order = await order_repo.get_by_id(new_order.id, session)
if fetched_order:
print(f"Fetched order: {fetched_order.id}, Version: {fetched_order.version}")
fetched_order.add_item(product_id=uuid.uuid4(), quantity=2, price=10.99)
# The add_item method records an event. Version is managed by the repository on save.
# Save the updated order
await order_repo.save(fetched_order, session)
print(f"Updated order version after save: {fetched_order.version}") # Expected: 2 (or higher if other changes occurred)
await session.commit()
else:
print("Order not found after saving!")
Common Exceptions¶
When working with aggregates and their repositories, you might encounter these common exceptions (defined in castlecraft_engineer.exc
or directly in castlecraft_engineer.abstractions.repository
and castlecraft_engineer.abstractions.aggregate
):
RepositoryError
: A generic base class for repository-related issues.AggregateNotFoundError
: Raised byget_by_id
if an aggregate with the specified ID cannot be found. It can also be raised bysave
ordelete
if the aggregate is expected to exist but doesn't (thoughsave
typically handles inserts gracefully).OptimisticConcurrencyError
: Raised by thesave
method if a version mismatch is detected, indicating that the aggregate has been modified by another process since it was last loaded.
Properly handling these exceptions is crucial for building robust applications.
Summary¶
Aggregates are a cornerstone of DDD, helping to maintain consistency and manage complexity. castlecraft-engineer
provides the Aggregate[TModel]
base class and generic AggregateRepository
/ AsyncAggregateRepository
implementations to streamline their use, including features like optimistic concurrency control and clear separation of persistence concerns.
Remember to manage your database sessions externally and pass them to repository methods, and ensure your concrete repositories correctly initialize the base class with aggregate_cls
and model_cls
.