Skip to content

castlecraft_engineer.abstractions.snapshot_store

castlecraft_engineer.abstractions.snapshot_store

SnapshotStore

Bases: Generic[TAggregateId], ABC

Abstract base class for a snapshot store, responsible for persisting and retrieving aggregate snapshots.

Source code in src/castlecraft_engineer/abstractions/snapshot_store.py
class SnapshotStore(Generic[TAggregateId], abc.ABC):
    """
    Abstract base class for a snapshot store, responsible for persisting
    and retrieving aggregate snapshots.
    """

    @abc.abstractmethod
    async def save_snapshot(self, snapshot: Snapshot[TAggregateId]) -> None:
        """
        Saves an aggregate snapshot. If a snapshot for the given aggregate_id
        and version (or newer) already exists, it might be overwritten or ignored,
        depending on the store's strategy. Typically, you'd overwrite if the new
        snapshot is for a more recent version.

        Args:
            snapshot: The Snapshot object to persist.
        """
        raise NotImplementedError

    @abc.abstractmethod
    async def get_latest_snapshot(
        self, aggregate_id: TAggregateId
    ) -> Optional[Snapshot[TAggregateId]]:
        """
        Retrieves the latest snapshot for a given aggregate.

        Args:
            aggregate_id: The ID of the aggregate whose snapshot is to be loaded.

        Returns:
            The latest Snapshot object, or None if no snapshot exists for the aggregate.
        """
        raise NotImplementedError

    async def clear_snapshots(self, aggregate_id: TAggregateId) -> None:
        """Optional: Clears all snapshots for a given aggregate."""
        pass  # Default implementation does nothing or can raise NotImplementedError

clear_snapshots(aggregate_id) async

Optional: Clears all snapshots for a given aggregate.

Source code in src/castlecraft_engineer/abstractions/snapshot_store.py
async def clear_snapshots(self, aggregate_id: TAggregateId) -> None:
    """Optional: Clears all snapshots for a given aggregate."""
    pass  # Default implementation does nothing or can raise NotImplementedError

get_latest_snapshot(aggregate_id) abstractmethod async

Retrieves the latest snapshot for a given aggregate.

Parameters:

Name Type Description Default
aggregate_id TAggregateId

The ID of the aggregate whose snapshot is to be loaded.

required

Returns:

Type Description
Optional[Snapshot[TAggregateId]]

The latest Snapshot object, or None if no snapshot exists for the aggregate.

Source code in src/castlecraft_engineer/abstractions/snapshot_store.py
@abc.abstractmethod
async def get_latest_snapshot(
    self, aggregate_id: TAggregateId
) -> Optional[Snapshot[TAggregateId]]:
    """
    Retrieves the latest snapshot for a given aggregate.

    Args:
        aggregate_id: The ID of the aggregate whose snapshot is to be loaded.

    Returns:
        The latest Snapshot object, or None if no snapshot exists for the aggregate.
    """
    raise NotImplementedError

save_snapshot(snapshot) abstractmethod async

Saves an aggregate snapshot. If a snapshot for the given aggregate_id and version (or newer) already exists, it might be overwritten or ignored, depending on the store's strategy. Typically, you'd overwrite if the new snapshot is for a more recent version.

Parameters:

Name Type Description Default
snapshot Snapshot[TAggregateId]

The Snapshot object to persist.

required
Source code in src/castlecraft_engineer/abstractions/snapshot_store.py
@abc.abstractmethod
async def save_snapshot(self, snapshot: Snapshot[TAggregateId]) -> None:
    """
    Saves an aggregate snapshot. If a snapshot for the given aggregate_id
    and version (or newer) already exists, it might be overwritten or ignored,
    depending on the store's strategy. Typically, you'd overwrite if the new
    snapshot is for a more recent version.

    Args:
        snapshot: The Snapshot object to persist.
    """
    raise NotImplementedError