Skip to content

Aggregate Snapshots and SnapshotStore

When working with Event Sourced Aggregates, especially those with a long history of events, reconstructing the aggregate's state by replaying every event can become a performance bottleneck. Snapshots provide an optimization by storing a point-in-time representation of an aggregate's state.

What is a Snapshot?

A snapshot is a persisted, serialized state of an EventSourcedAggregate at a specific version. Instead of replaying all events from the beginning, an application can load the latest snapshot and then replay only the events that occurred after that snapshot was taken.

Castlecraft Engineer defines a Snapshot data structure, typically containing: * aggregate_id: The unique identifier of the aggregate. * aggregate_state: The serialized state of the aggregate at the time of the snapshot. * version: The version of the aggregate after the last event included in this snapshot's state.

Snapshotting in EventSourcedAggregate

The EventSourcedAggregate base class includes support for creating and applying snapshots. Subclasses need to implement two key abstract methods:

  • _get_snapshot_state() -> Any:

    • This method is responsible for returning the current, serializable state of the aggregate. The specific format (e.g., a dictionary, a custom data object) is up to the implementer.
    • Example:
      # In MyOrderAggregate(EventSourcedAggregate):
      def _get_snapshot_state(self) -> Dict[str, Any]:
          return {
              "customer_id": self.customer_id,
              "status": self.status,
              "items": [item.to_dict() for item in self._items],
              # Note: version and id are part of the Snapshot object itself
          }
      
  • _apply_snapshot_state(state: Any) -> None:

    • This method restores the aggregate's internal state from the deserialized aggregate_state provided by a Snapshot object.
    • Example:
      # In MyOrderAggregate(EventSourcedAggregate):
      def _apply_snapshot_state(self, state: Dict[str, Any]) -> None:
          self.customer_id = state.get("customer_id")
          self.status = state.get("status")
          self._items = [OrderItem.from_dict(item_data) for item_data in state.get("items", [])]
          # Note: The aggregate's version will be set by the repository after this method
      

The base EventSourcedAggregate provides a create_snapshot() -> Snapshot method which uses _get_snapshot_state() to capture the current state and packages it with the aggregate_id and version into a Snapshot object.

The SnapshotStore Abstraction

The SnapshotStore is an abstract base class that defines the contract for persisting and retrieving Snapshot objects. It decouples the aggregate and repository logic from the specific snapshot storage mechanism.

Key methods of SnapshotStore: * save_snapshot(snapshot: Snapshot) -> None: Saves the provided snapshot. Implementations might overwrite existing snapshots for the same aggregate if the new one has an equal or higher version. * get_latest_snapshot(aggregate_id: TAggregateId) -> Optional[Snapshot]: Retrieves the most recent snapshot for a given aggregate ID, or None if no snapshot exists.

Castlecraft Engineer provides an InMemorySnapshotStore for testing and development. For production, you would typically implement a SnapshotStore that integrates with a persistent storage solution like a database (e.g., Redis, a NoSQL database, or even a relational database table).

Snapshotting Strategy

Deciding when and how often to take snapshots is an important consideration: * Frequency: Snapshots can be taken every N events (e.g., every 50 or 100 events), based on time intervals, or triggered by specific aggregate versions. The optimal frequency depends on the rate of events and the performance requirements for loading aggregates. * Triggering: The responsibility for triggering snapshot creation often lies with the repository after successfully saving new events, or it could be handled by a separate background process. * Serialization: The aggregate_state within a snapshot needs to be serialized. Choose a stable and efficient serialization format (e.g., JSON, MessagePack, Protobuf). Consider schema evolution if your aggregate's state structure changes over time.

Loading an Aggregate with Snapshots (Conceptual Flow)

A repository responsible for loading an EventSourcedAggregate typically follows this process: 1. Attempt to retrieve the latest snapshot for the aggregate ID from the SnapshotStore. 2. If a snapshot is found: a. Create an instance of the aggregate. b. Apply the snapshot's state to the instance using _apply_snapshot_state(snapshot.aggregate_state). c. Set the aggregate's current version to snapshot.version. d. Fetch events from the EventStore that occurred after snapshot.version. e. Apply these subsequent events to the aggregate instance to bring it to the latest state. 3. If no snapshot is found: a. Fetch all events for the aggregate ID from the EventStore. b. Reconstruct the aggregate by replaying all events using EventSourcedAggregate.load_from_history().

Benefits of Snapshots

  • Improved Read Performance: Significantly speeds up the loading of aggregates with many events.
  • Reduced Load on EventStore: Fewer events need to be read from the EventStore during aggregate hydration.

Considerations

  • Storage Cost: Snapshots consume additional storage space.
  • Complexity: Introduces the SnapshotStore and the logic for managing snapshot creation and restoration.
  • Snapshot Staleness: A snapshot is a point-in-time capture. Events occurring after the snapshot still need to be replayed. This is by design and ensures consistency.
  • Schema Evolution: Changes to the aggregate's state structure may require strategies for migrating or handling older snapshot formats.

API Reference

For detailed API information, refer to: * Snapshot API Reference * SnapshotStore API Reference