Event Sourced Aggregates¶
An Event Sourced Aggregate is a specialized type of Aggregate that derives its state entirely from a sequence of historical events. Instead of storing the current state directly, an Event Sourced Aggregate records every change as an immutable Event
. When the aggregate needs to be loaded, its state is reconstructed by replaying these events.
Castlecraft Engineer provides the EventSourcedAggregate
base class, which extends the standard Aggregate
with the necessary mechanisms for event sourcing.
Key Characteristics¶
State Mutability through Events¶
The fundamental principle is that the state of an EventSourcedAggregate
is never mutated directly. All state changes are the result of applying domain events. This provides a full audit trail and allows for powerful features like temporal queries or replaying history.
Event Appliers¶
Event appliers are methods within your aggregate that are responsible for changing the aggregate's state based on a specific event. You register these appliers using _register_event_appliers
(which you must implement) and the helper _register_applier
.
Example of registering an applier:
# In your MyOrderAggregate(EventSourcedAggregate) class:
def _register_event_appliers(self) -> None:
self._register_applier(OrderCreated, self._apply_order_created)
self._register_applier(OrderItemAdded, self._apply_item_added)
def _apply_order_created(self, event: OrderCreated) -> None:
# Assuming OrderCreated is an Event dataclass
self.customer_id = event.customer_id
self.status = "PENDING" # Initial state
def _apply_item_added(self, event: OrderItemAdded) -> None:
# Assuming OrderItemAdded is an Event dataclass
self._items.append(OrderItem(product_id=event.product_id, quantity=event.quantity))
Recording Events¶
When a command method on your aggregate results in a state change, it should create an event object and then call self._record_and_apply_event(event)
. This method does two things:
1. Records the event in the list of uncommitted events (inherited from the base Aggregate
).
2. Applies the event to the current instance using the registered applier, thereby updating its state and incrementing its version.
Example in a command method:
# In your MyOrderAggregate(EventSourcedAggregate) class:
def add_item(self, product_id: UUID, quantity: int, price: float):
if quantity <= 0:
raise ValueError("Quantity must be positive.")
# ... other business rules ...
item_added_event = OrderItemAdded(order_id=self.id, product_id=product_id, quantity=quantity, price=price)
self._record_and_apply_event(item_added_event)
Loading from History¶
The EventSourcedAggregate
class provides a class method load_from_history(cls, aggregate_id, history: List[Event])
. This method is typically used by a repository to reconstruct an aggregate instance:
1. It creates a new instance of the aggregate with an initial version (usually -1).
2. It iterates through the provided list of historical events and applies each one using _apply_event()
. This internally calls the registered appliers and increments the version.
3. After all historical events are applied, any uncommitted events (which shouldn't exist in this flow if events are from storage) are cleared.
Versioning¶
The version
of an EventSourcedAggregate
starts at -1 (or the version provided during snapshot restoration). Each time an event is applied (either during loading from history or when a new event is recorded and applied), the version is incremented. This version is crucial for optimistic concurrency control when persisting events to an Event Store.
Snapshotting Integration¶
For aggregates with very long event histories, replaying all events every time can become a performance bottleneck. EventSourcedAggregate
supports snapshotting to mitigate this. A snapshot is a persisted representation of the aggregate's state at a specific version.
Key methods for snapshotting (which subclasses must implement):
* _get_snapshot_state() -> Any
: Returns the current state of the aggregate in a serializable format.
* _apply_snapshot_state(state: Any) -> None
: Restores the aggregate's state from a deserialized snapshot.
The base class provides create_snapshot() -> Snapshot
which uses _get_snapshot_state()
to package the state along with the aggregate ID and current version into a Snapshot
object.
For more details, see Snapshots and SnapshotStore.
Handling Unapplied Events¶
If an event is processed for which no applier has been registered (e.g., via _apply_event
or _record_and_apply_event
), the _handle_unapplied_event(event: Event)
method is called. By default, this logs a warning. Subclasses can override this method to implement stricter error handling (e.g., raising an exception) or other custom logic.
When to Use Event Sourced Aggregates¶
- When a complete audit trail of all changes to an entity is required.
- For complex domain logic where understanding the history of changes is important for current decisions.
- When you need the ability to reconstruct past states of an aggregate (temporal queries).
- As a foundation for advanced patterns like event stream processing or CQRS.
Best Practices¶
- Immutable Events: Events should represent facts that have happened and should be immutable.
- Small, Focused Events: Design events to capture specific state transitions.
- Idempotent Appliers (if possible): While events are typically applied once in sequence, designing appliers to be idempotent can simplify certain recovery or reprocessing scenarios, though the primary idempotency concern is often at the command handler level.
- No I/O in Appliers: Event appliers should only modify the aggregate's in-memory state. They should not perform database calls, API requests, or any other I/O operations.
Relationship to Other Components¶
- Events:
EventSourcedAggregate
consumes and producesEvent
objects (see Events). - Event Store: It relies on an Event Store (via a repository) to persist its event stream and to load its history.
- Repository: An
AggregateRepository
(often a specialized version for event-sourced aggregates) is responsible for orchestrating the loading (from history and/or snapshots) and saving (appending new events to the Event Store) ofEventSourcedAggregates
.
API Reference¶
For detailed API information, refer to the EventSourcedAggregate API Reference.