Aggregate Snapshots and SnapshotStore¶
When working with Event Sourced Aggregates, especially those with a long history of events, reconstructing the aggregate's state by replaying every event can become a performance bottleneck. Snapshots provide an optimization by storing a point-in-time representation of an aggregate's state.
What is a Snapshot?¶
A snapshot is a persisted, serialized state of an EventSourcedAggregate
at a specific version. Instead of replaying all events from the beginning, an application can load the latest snapshot and then replay only the events that occurred after that snapshot was taken.
Castlecraft Engineer defines a Snapshot
data structure, typically containing:
* aggregate_id
: The unique identifier of the aggregate.
* aggregate_state
: The serialized state of the aggregate at the time of the snapshot.
* version
: The version of the aggregate after the last event included in this snapshot's state.
Snapshotting in EventSourcedAggregate
¶
The EventSourcedAggregate
base class includes support for creating and applying snapshots. Subclasses need to implement two key abstract methods:
-
_get_snapshot_state() -> Any
:- This method is responsible for returning the current, serializable state of the aggregate. The specific format (e.g., a dictionary, a custom data object) is up to the implementer.
- Example:
-
_apply_snapshot_state(state: Any) -> None
:- This method restores the aggregate's internal state from the deserialized
aggregate_state
provided by aSnapshot
object. - Example:
# In MyOrderAggregate(EventSourcedAggregate): def _apply_snapshot_state(self, state: Dict[str, Any]) -> None: self.customer_id = state.get("customer_id") self.status = state.get("status") self._items = [OrderItem.from_dict(item_data) for item_data in state.get("items", [])] # Note: The aggregate's version will be set by the repository after this method
- This method restores the aggregate's internal state from the deserialized
The base EventSourcedAggregate
provides a create_snapshot() -> Snapshot
method which uses _get_snapshot_state()
to capture the current state and packages it with the aggregate_id
and version
into a Snapshot
object.
The SnapshotStore
Abstraction¶
The SnapshotStore
is an abstract base class that defines the contract for persisting and retrieving Snapshot
objects. It decouples the aggregate and repository logic from the specific snapshot storage mechanism.
Key methods of SnapshotStore
:
* save_snapshot(snapshot: Snapshot) -> None
: Saves the provided snapshot. Implementations might overwrite existing snapshots for the same aggregate if the new one has an equal or higher version.
* get_latest_snapshot(aggregate_id: TAggregateId) -> Optional[Snapshot]
: Retrieves the most recent snapshot for a given aggregate ID, or None
if no snapshot exists.
Castlecraft Engineer provides an InMemorySnapshotStore
for testing and development. For production, you would typically implement a SnapshotStore
that integrates with a persistent storage solution like a database (e.g., Redis, a NoSQL database, or even a relational database table).
Snapshotting Strategy¶
Deciding when and how often to take snapshots is an important consideration:
* Frequency: Snapshots can be taken every N events (e.g., every 50 or 100 events), based on time intervals, or triggered by specific aggregate versions. The optimal frequency depends on the rate of events and the performance requirements for loading aggregates.
* Triggering: The responsibility for triggering snapshot creation often lies with the repository after successfully saving new events, or it could be handled by a separate background process.
* Serialization: The aggregate_state
within a snapshot needs to be serialized. Choose a stable and efficient serialization format (e.g., JSON, MessagePack, Protobuf). Consider schema evolution if your aggregate's state structure changes over time.
Loading an Aggregate with Snapshots (Conceptual Flow)¶
A repository responsible for loading an EventSourcedAggregate
typically follows this process:
1. Attempt to retrieve the latest snapshot for the aggregate ID from the SnapshotStore
.
2. If a snapshot is found:
a. Create an instance of the aggregate.
b. Apply the snapshot's state to the instance using _apply_snapshot_state(snapshot.aggregate_state)
.
c. Set the aggregate's current version to snapshot.version
.
d. Fetch events from the EventStore
that occurred after snapshot.version
.
e. Apply these subsequent events to the aggregate instance to bring it to the latest state.
3. If no snapshot is found:
a. Fetch all events for the aggregate ID from the EventStore
.
b. Reconstruct the aggregate by replaying all events using EventSourcedAggregate.load_from_history()
.
Benefits of Snapshots¶
- Improved Read Performance: Significantly speeds up the loading of aggregates with many events.
- Reduced Load on EventStore: Fewer events need to be read from the
EventStore
during aggregate hydration.
Considerations¶
- Storage Cost: Snapshots consume additional storage space.
- Complexity: Introduces the
SnapshotStore
and the logic for managing snapshot creation and restoration. - Snapshot Staleness: A snapshot is a point-in-time capture. Events occurring after the snapshot still need to be replayed. This is by design and ensures consistency.
- Schema Evolution: Changes to the aggregate's state structure may require strategies for migrating or handling older snapshot formats.
API Reference¶
For detailed API information, refer to: * Snapshot API Reference * SnapshotStore API Reference