Skip to content

Event Store

The Event Store is a fundamental component when implementing the Event Sourcing pattern. In Event Sourcing, all changes to an application's state are stored as a sequence of immutable events. The Event Store is the specialized mechanism responsible for persisting and retrieving these event streams.

EventStore Abstraction in Castlecraft Engineer

Castlecraft Engineer provides an EventStore[TAggregateId] abstract base class (ABC) to decouple your domain logic from the specific event storage technology. This allows you to switch event storage implementations (e.g., an in-memory store for testing, a relational database, or a dedicated event store like EventStoreDB for production) without altering your core business logic.

The TAggregateId is a generic type parameter, meaning the Event Store can work with aggregates that use any type of identifier (e.g., UUID, int, str).

Key Components

EventStoreConflictError

This exception is raised by the append_events method when an optimistic concurrency conflict is detected. This typically happens if the expected_version provided during an append operation does not match the current version of the aggregate's event stream in the store, indicating that another process might have modified the aggregate since it was last loaded.

class EventStoreConflictError(RuntimeError):
    def __init__(self, aggregate_id: TAggregateId, expected_version: int, actual_version: Optional[int] = None):
        self.aggregate_id = aggregate_id
        self.expected_version = expected_version
        self.actual_version = actual_version
        super().__init__(
            f"Conflict appending events for aggregate {aggregate_id}. "
            f"Expected version {expected_version}, but current version is {actual_version}."
        )

append_events

async def append_events(
    self,
    aggregate_id: TAggregateId,
    expected_version: int,
    events: List[Event],
) -> None:
Appends a list of domain events to the stream for a given aggregate. * aggregate_id: The ID of the aggregate to which the events belong. * expected_version: The version of the aggregate that these events are based on. This is crucial for optimistic concurrency. For a new aggregate, this is typically -1. * events: A list of Event instances to append. Raises EventStoreConflictError if expected_version doesn't match the stream's current version.

load_events

async def load_events(
    self,
    aggregate_id: TAggregateId,
    from_version: Optional[int] = None,
) -> List[Event]:
Loads the stream of events for a given aggregate. * aggregate_id: The ID of the aggregate whose events are to be loaded. * from_version: Optionally, the version after which events should be loaded. If None, all events for the aggregate are loaded. For example, if from_version=0, events with sequence numbers 1, 2, ... are loaded. Returns an ordered list of Event instances. If the aggregate has no events or doesn't exist, an empty list is returned.

get_current_version

async def get_current_version(self, aggregate_id: TAggregateId) -> Optional[int]:
Retrieves the current version of the event stream for a given aggregate. The version is typically the sequence number of the last event in the stream (0-indexed). * aggregate_id: The ID of the aggregate. Returns the current version (e.g., 0 if one event exists, 1 if two events exist, etc.) or None if the aggregate stream doesn't exist or has no events.

Relationship with Aggregates and Repositories

When using Event Sourcing, your AggregateRepository implementation would interact with the EventStore:

  1. Saving an Aggregate:

    • The repository retrieves uncommitted events from the aggregate (e.g., aggregate.pull_uncommitted_events()).
    • It then calls event_store.append_events() with the aggregate's ID, its current version (as expected_version), and the list of new events.
    • The aggregate's version is then updated based on the number of successfully appended events.
  2. Loading an Aggregate:

    • The repository calls event_store.load_events() for the given aggregate ID.
    • A new, empty aggregate instance is created.
    • The loaded events are replayed on this instance one by one (e.g., via an aggregate.apply(event) method), reconstructing its state.
    • The aggregate's version is set to the version of the last applied event.

This approach ensures that the aggregate's state is always derived from its history of events, which are durably persisted in the Event Store.

Abstract Base Class Definition

The core abstraction is defined in castlecraft_engineer.abstractions.event_store:

import abc
from typing import List, Optional, TypeVar, Generic
from uuid import UUID # Example, TAggregateId can be any type

from castlecraft_engineer.abstractions.event import Event

TAggregateId = TypeVar("TAggregateId")

class EventStoreConflictError(RuntimeError):
    # ... (as defined above)

class EventStore(Generic[TAggregateId], abc.ABC):
    @abc.abstractmethod
    async def append_events(
        self,
        aggregate_id: TAggregateId,
        expected_version: int,
        events: List[Event],
    ) -> None:
        raise NotImplementedError

    @abc.abstractmethod
    async def load_events(
        self,
        aggregate_id: TAggregateId,
        from_version: Optional[int] = None,
    ) -> List[Event]:
        raise NotImplementedError

    @abc.abstractmethod
    async def get_current_version(self, aggregate_id: TAggregateId) -> Optional[int]:
        raise NotImplementedError
Concrete implementations, like InMemoryEventStore for testing or a database-backed store for production, will inherit from this ABC.