Skip to content

castlecraft_engineer.abstractions.event_sourced_aggregate

castlecraft_engineer.abstractions.event_sourced_aggregate

EventSourcedAggregate

Bases: Aggregate[TAggregateId], Generic[TAggregateId], ABC

Base class for Aggregates that are event-sourced. It extends the base Aggregate with event application logic.

Source code in src/castlecraft_engineer/abstractions/event_sourced_aggregate.py
class EventSourcedAggregate(Aggregate[TAggregateId], Generic[TAggregateId], abc.ABC):
    """
    Base class for Aggregates that are event-sourced.
    It extends the base Aggregate with event application logic.
    """

    def __init__(self, id: TAggregateId, version: int = -1) -> None:
        super().__init__(id, version)
        # _event_appliers maps event types to methods that apply them
        self._event_appliers: Dict[Type[Event], Callable[[Event], None]] = {}
        self._register_event_appliers()

    @abc.abstractmethod
    def _register_event_appliers(self) -> None:
        """
        Subclasses must implement this method to register their event appliers.
        Example: self._register_applier(MyEvent, self._apply_my_event)
        """

    def _register_applier(
        self, event_type: Type[Event], applier: Callable[[Event], None]
    ) -> None:
        """Registers a method to apply a specific event type."""
        if not issubclass(event_type, Event):
            raise TypeError(f"event_type must be a subclass of Event, got {event_type}")
        if not callable(applier):
            raise TypeError(f"applier must be callable, got {applier}")
        self._event_appliers[event_type] = applier

    def _handle_unapplied_event(self, event: Event) -> None:
        """
        Hook method called when no applier is found for an event.
        Default behavior is to log a warning. Subclasses can override
        to raise an error or implement other custom logic.
        """
        logger.warning(
            f"No event applier registered for event type {type(event).__name__} "
            f"in {self.__class__.__name__}. Event was not applied to state."
        )

    def _apply_event(self, event: Event) -> None:
        """
        Applies an event to the aggregate, changing its state.
        This method finds the registered applier for the event type and calls it.
        It also increments the aggregate's version.
        """
        applier = self._event_appliers.get(type(event))
        if not applier:
            self._handle_unapplied_event(event)
        else:
            applier(event)

        # Version is incremented for each applied event
        self._increment_version()

    def _record_and_apply_event(self, event: Event) -> None:
        """
        Records an event and then applies it to the aggregate.
        This is the primary way domain methods should effect state changes.
        """
        # From base Aggregate: adds to _uncommitted_events
        self._record_event(event)

        # Applies to current state and increments version
        self._apply_event(event)

    @classmethod
    def load_from_history(
        cls: Type[ESA], aggregate_id: TAggregateId, history: List[Event]
    ) -> ESA:
        """
        Reconstructs an aggregate instance from its event history.
        The initial version is -1, and each applied event increments it.
        """
        # Initial version is -1, as per Aggregate base class for a new/empty instance
        instance = cls(id=aggregate_id, version=-1)
        for event in history:
            instance._apply_event(event)
        # After applying all historical events, clear uncommitted events
        # as these are already persisted.
        instance._uncommitted_events.clear()
        return instance

    def create_snapshot(self) -> Snapshot[TAggregateId]:
        """
        Creates a snapshot of the aggregate's current state and version.
        Subclasses might need to override this if their state isn't easily
        serializable from public attributes or if they need custom serialization.
        """
        state_to_snapshot = self._get_snapshot_state()
        return Snapshot(
            aggregate_id=self.id,
            aggregate_state=state_to_snapshot,
            version=self.version,
        )

    @abc.abstractmethod
    def _get_snapshot_state(self) -> Any:
        """
        Returns the serializable state of the aggregate for snapshotting.
        Subclasses must implement this to define what data constitutes their snapshot.
        """

    @abc.abstractmethod
    def _apply_snapshot_state(self, state: Any) -> None:
        """
        Applies a deserialized state from a snapshot to the aggregate.
        Subclasses must implement this to restore their state from snapshot data.
        """

create_snapshot()

Creates a snapshot of the aggregate's current state and version. Subclasses might need to override this if their state isn't easily serializable from public attributes or if they need custom serialization.

Source code in src/castlecraft_engineer/abstractions/event_sourced_aggregate.py
def create_snapshot(self) -> Snapshot[TAggregateId]:
    """
    Creates a snapshot of the aggregate's current state and version.
    Subclasses might need to override this if their state isn't easily
    serializable from public attributes or if they need custom serialization.
    """
    state_to_snapshot = self._get_snapshot_state()
    return Snapshot(
        aggregate_id=self.id,
        aggregate_state=state_to_snapshot,
        version=self.version,
    )

load_from_history(aggregate_id, history) classmethod

Reconstructs an aggregate instance from its event history. The initial version is -1, and each applied event increments it.

Source code in src/castlecraft_engineer/abstractions/event_sourced_aggregate.py
@classmethod
def load_from_history(
    cls: Type[ESA], aggregate_id: TAggregateId, history: List[Event]
) -> ESA:
    """
    Reconstructs an aggregate instance from its event history.
    The initial version is -1, and each applied event increments it.
    """
    # Initial version is -1, as per Aggregate base class for a new/empty instance
    instance = cls(id=aggregate_id, version=-1)
    for event in history:
        instance._apply_event(event)
    # After applying all historical events, clear uncommitted events
    # as these are already persisted.
    instance._uncommitted_events.clear()
    return instance