Skip to content

Event Sourced Aggregates

An Event Sourced Aggregate is a specialized type of Aggregate that derives its state entirely from a sequence of historical events. Instead of storing the current state directly, an Event Sourced Aggregate records every change as an immutable Event. When the aggregate needs to be loaded, its state is reconstructed by replaying these events.

Castlecraft Engineer provides the EventSourcedAggregate base class, which extends the standard Aggregate with the necessary mechanisms for event sourcing.

Key Characteristics

State Mutability through Events

The fundamental principle is that the state of an EventSourcedAggregate is never mutated directly. All state changes are the result of applying domain events. This provides a full audit trail and allows for powerful features like temporal queries or replaying history.

Event Appliers

Event appliers are methods within your aggregate that are responsible for changing the aggregate's state based on a specific event. You register these appliers using _register_event_appliers (which you must implement) and the helper _register_applier.

Example of registering an applier:

# In your MyOrderAggregate(EventSourcedAggregate) class:
def _register_event_appliers(self) -> None:
    self._register_applier(OrderCreated, self._apply_order_created)
    self._register_applier(OrderItemAdded, self._apply_item_added)

def _apply_order_created(self, event: OrderCreated) -> None:
    # Assuming OrderCreated is an Event dataclass
    self.customer_id = event.customer_id
    self.status = "PENDING" # Initial state

def _apply_item_added(self, event: OrderItemAdded) -> None:
    # Assuming OrderItemAdded is an Event dataclass
    self._items.append(OrderItem(product_id=event.product_id, quantity=event.quantity))
Each applier method takes the specific event instance as an argument. It should only modify the internal state of the aggregate and should not have external side effects.

Recording Events

When a command method on your aggregate results in a state change, it should create an event object and then call self._record_and_apply_event(event). This method does two things: 1. Records the event in the list of uncommitted events (inherited from the base Aggregate). 2. Applies the event to the current instance using the registered applier, thereby updating its state and incrementing its version.

Example in a command method:

# In your MyOrderAggregate(EventSourcedAggregate) class:
def add_item(self, product_id: UUID, quantity: int, price: float):
    if quantity <= 0:
        raise ValueError("Quantity must be positive.")
    # ... other business rules ...
    item_added_event = OrderItemAdded(order_id=self.id, product_id=product_id, quantity=quantity, price=price)
    self._record_and_apply_event(item_added_event)

Loading from History

The EventSourcedAggregate class provides a class method load_from_history(cls, aggregate_id, history: List[Event]). This method is typically used by a repository to reconstruct an aggregate instance: 1. It creates a new instance of the aggregate with an initial version (usually -1). 2. It iterates through the provided list of historical events and applies each one using _apply_event(). This internally calls the registered appliers and increments the version. 3. After all historical events are applied, any uncommitted events (which shouldn't exist in this flow if events are from storage) are cleared.

Versioning

The version of an EventSourcedAggregate starts at -1 (or the version provided during snapshot restoration). Each time an event is applied (either during loading from history or when a new event is recorded and applied), the version is incremented. This version is crucial for optimistic concurrency control when persisting events to an Event Store.

Snapshotting Integration

For aggregates with very long event histories, replaying all events every time can become a performance bottleneck. EventSourcedAggregate supports snapshotting to mitigate this. A snapshot is a persisted representation of the aggregate's state at a specific version.

Key methods for snapshotting (which subclasses must implement): * _get_snapshot_state() -> Any: Returns the current state of the aggregate in a serializable format. * _apply_snapshot_state(state: Any) -> None: Restores the aggregate's state from a deserialized snapshot.

The base class provides create_snapshot() -> Snapshot which uses _get_snapshot_state() to package the state along with the aggregate ID and current version into a Snapshot object. For more details, see Snapshots and SnapshotStore.

Handling Unapplied Events

If an event is processed for which no applier has been registered (e.g., via _apply_event or _record_and_apply_event), the _handle_unapplied_event(event: Event) method is called. By default, this logs a warning. Subclasses can override this method to implement stricter error handling (e.g., raising an exception) or other custom logic.

When to Use Event Sourced Aggregates

  • When a complete audit trail of all changes to an entity is required.
  • For complex domain logic where understanding the history of changes is important for current decisions.
  • When you need the ability to reconstruct past states of an aggregate (temporal queries).
  • As a foundation for advanced patterns like event stream processing or CQRS.

Best Practices

  • Immutable Events: Events should represent facts that have happened and should be immutable.
  • Small, Focused Events: Design events to capture specific state transitions.
  • Idempotent Appliers (if possible): While events are typically applied once in sequence, designing appliers to be idempotent can simplify certain recovery or reprocessing scenarios, though the primary idempotency concern is often at the command handler level.
  • No I/O in Appliers: Event appliers should only modify the aggregate's in-memory state. They should not perform database calls, API requests, or any other I/O operations.

Relationship to Other Components

  • Events: EventSourcedAggregate consumes and produces Event objects (see Events).
  • Event Store: It relies on an Event Store (via a repository) to persist its event stream and to load its history.
  • Repository: An AggregateRepository (often a specialized version for event-sourced aggregates) is responsible for orchestrating the loading (from history and/or snapshots) and saving (appending new events to the Event Store) of EventSourcedAggregates.

API Reference

For detailed API information, refer to the EventSourcedAggregate API Reference.