Skip to content

castlecraft_engineer.abstractions.event_store

castlecraft_engineer.abstractions.event_store

EventStore

Bases: Generic[TAggregateId], ABC

Abstract base class for an event store, responsible for persisting and retrieving streams of domain events.

Source code in src/castlecraft_engineer/abstractions/event_store.py
class EventStore(Generic[TAggregateId], abc.ABC):
    """
    Abstract base class for an event store, responsible for persisting
    and retrieving streams of domain events.
    """

    @abc.abstractmethod
    async def append_events(
        self,
        aggregate_id: TAggregateId,
        expected_version: int,
        events: List[Event],
    ) -> None:
        """
        Appends a list of events to the stream for a given aggregate.

        Args:
            aggregate_id: The ID of the aggregate to which the events belong.
            expected_version: The version of the aggregate that these events are based on.
                              Used for optimistic concurrency control. If the current
                              version in the store does not match this, an
                              EventStoreConflictError should be raised.
            events: A list of domain event instances to append.

        Raises:
            EventStoreConflictError: If the expected_version does not match the
                                     current version of the event stream for the aggregate.
            Exception: Implementation-specific exceptions related to storage failures.
        """
        raise NotImplementedError

    @abc.abstractmethod
    async def load_events(
        self,
        aggregate_id: TAggregateId,
        from_version: Optional[int] = None,
    ) -> List[Event]:
        """
        Loads the stream of events for a given aggregate.

        Args:
            aggregate_id: The ID of the aggregate whose events are to be loaded.
            from_version: Optionally, the version from which to start loading events.
                          If None, loads all events for the aggregate.

        Returns:
            A list of domain event instances, ordered by their sequence.
            Returns an empty list if the aggregate has no events or doesn't exist.
        """
        raise NotImplementedError

    @abc.abstractmethod
    async def get_current_version(self, aggregate_id: TAggregateId) -> Optional[int]:
        """
        Retrieves the current version of the event stream for a given aggregate.

        Args:
            aggregate_id: The ID of the aggregate.

        Returns:
            The current version (number of events - 1, or sequence of last event)
            or None if the aggregate stream doesn't exist.
        """
        raise NotImplementedError

append_events(aggregate_id, expected_version, events) abstractmethod async

Appends a list of events to the stream for a given aggregate.

Parameters:

Name Type Description Default
aggregate_id TAggregateId

The ID of the aggregate to which the events belong.

required
expected_version int

The version of the aggregate that these events are based on. Used for optimistic concurrency control. If the current version in the store does not match this, an EventStoreConflictError should be raised.

required
events List[Event]

A list of domain event instances to append.

required

Raises:

Type Description
EventStoreConflictError

If the expected_version does not match the current version of the event stream for the aggregate.

Exception

Implementation-specific exceptions related to storage failures.

Source code in src/castlecraft_engineer/abstractions/event_store.py
@abc.abstractmethod
async def append_events(
    self,
    aggregate_id: TAggregateId,
    expected_version: int,
    events: List[Event],
) -> None:
    """
    Appends a list of events to the stream for a given aggregate.

    Args:
        aggregate_id: The ID of the aggregate to which the events belong.
        expected_version: The version of the aggregate that these events are based on.
                          Used for optimistic concurrency control. If the current
                          version in the store does not match this, an
                          EventStoreConflictError should be raised.
        events: A list of domain event instances to append.

    Raises:
        EventStoreConflictError: If the expected_version does not match the
                                 current version of the event stream for the aggregate.
        Exception: Implementation-specific exceptions related to storage failures.
    """
    raise NotImplementedError

get_current_version(aggregate_id) abstractmethod async

Retrieves the current version of the event stream for a given aggregate.

Parameters:

Name Type Description Default
aggregate_id TAggregateId

The ID of the aggregate.

required

Returns:

Type Description
Optional[int]

The current version (number of events - 1, or sequence of last event)

Optional[int]

or None if the aggregate stream doesn't exist.

Source code in src/castlecraft_engineer/abstractions/event_store.py
@abc.abstractmethod
async def get_current_version(self, aggregate_id: TAggregateId) -> Optional[int]:
    """
    Retrieves the current version of the event stream for a given aggregate.

    Args:
        aggregate_id: The ID of the aggregate.

    Returns:
        The current version (number of events - 1, or sequence of last event)
        or None if the aggregate stream doesn't exist.
    """
    raise NotImplementedError

load_events(aggregate_id, from_version=None) abstractmethod async

Loads the stream of events for a given aggregate.

Parameters:

Name Type Description Default
aggregate_id TAggregateId

The ID of the aggregate whose events are to be loaded.

required
from_version Optional[int]

Optionally, the version from which to start loading events. If None, loads all events for the aggregate.

None

Returns:

Type Description
List[Event]

A list of domain event instances, ordered by their sequence.

List[Event]

Returns an empty list if the aggregate has no events or doesn't exist.

Source code in src/castlecraft_engineer/abstractions/event_store.py
@abc.abstractmethod
async def load_events(
    self,
    aggregate_id: TAggregateId,
    from_version: Optional[int] = None,
) -> List[Event]:
    """
    Loads the stream of events for a given aggregate.

    Args:
        aggregate_id: The ID of the aggregate whose events are to be loaded.
        from_version: Optionally, the version from which to start loading events.
                      If None, loads all events for the aggregate.

    Returns:
        A list of domain event instances, ordered by their sequence.
        Returns an empty list if the aggregate has no events or doesn't exist.
    """
    raise NotImplementedError

EventStoreConflictError

Bases: RuntimeError

Raised when there's a conflict appending events, e.g., due to version mismatch.

Source code in src/castlecraft_engineer/abstractions/event_store.py
class EventStoreConflictError(RuntimeError):
    """Raised when there's a conflict appending events, e.g., due to version mismatch."""

    def __init__(
        self,
        aggregate_id: TAggregateId,
        expected_version: int,
        actual_version: Optional[int] = None,
    ):
        self.aggregate_id = aggregate_id
        self.expected_version = expected_version
        self.actual_version = actual_version
        super().__init__(
            f"Conflict appending events for aggregate {aggregate_id}. "
            f"Expected version {expected_version}, but current version is {actual_version}."
        )