Skip to content

castlecraft_engineer.abstractions.projector

castlecraft_engineer.abstractions.projector

Projector

Bases: ABC

Abstract base class for projectors. Projectors are responsible for consuming events and updating read models or performing other side effects based on those events.

Source code in src/castlecraft_engineer/abstractions/projector.py
class Projector(abc.ABC):
    """
    Abstract base class for projectors.
    Projectors are responsible for consuming events and updating read models
    or performing other side effects based on those events.
    """

    def __init__(self, projection_store: ProjectionStore):
        self._event_handlers: Dict[
            Type[Event],
            List[
                Union[
                    Callable[[Event], None],
                    Callable[[Event], Coroutine[Any, Any, None]],
                ]
            ],
        ] = {}
        # Store to manage projection's last processed event
        self._projection_store = projection_store
        self._register_event_handlers()

    @property
    @abc.abstractmethod
    def projection_id(self) -> ProjectionId:
        """
        A unique identifier for this projector or the projection it manages.
        Used to store and retrieve its processing state.
        """
        raise NotImplementedError

    @abc.abstractmethod
    def _register_event_handlers(self) -> None:
        """
        Subclasses must implement this method to register handlers for specific event types.
        Example: self._register_handler(MyEvent, self._handle_my_event)
        """
        raise NotImplementedError

    def _register_handler(
        self,
        event_type: Type[Event],
        handler: Union[
            Callable[[Event], None], Callable[[Event], Coroutine[Any, Any, None]]
        ],
    ) -> None:
        """Registers a method to handle a specific event type."""
        if not issubclass(event_type, Event):
            raise TypeError(f"event_type must be a subclass of Event, got {event_type}")
        if not callable(handler):
            raise TypeError(f"handler must be callable, got {handler}")

        if event_type not in self._event_handlers:
            self._event_handlers[event_type] = []
        self._event_handlers[event_type].append(handler)

    async def handle_event(self, event: Event) -> None:
        """
        Handles an incoming event by dispatching it to registered handlers.
        This method should be called by the event stream consumer.
        It also updates the projection state after successful handling.
        """
        event_type = type(event)
        handlers = self._event_handlers.get(event_type)

        if not handlers:
            # Optionally log or ignore events the projector is not interested in
            logger.debug(
                f"Projector {self.projection_id} has no handler for event type {event_type.__name__}"
            )
            return

        try:
            for handler in handlers:
                # Depending on whether handlers are async or sync, you might need to adjust.
                # For simplicity, assuming sync handlers here. If async, use `await handler(event)`.
                # If handlers can be async, this method should be `async def handle_event`.
                # For now, let's assume handlers are synchronous for simplicity in the ABC.
                # Concrete projectors can decide if their handlers are async.
                # If a handler is async, the `handle_event` method itself should be async.
                # Let's make it async to be more flexible.
                await self._invoke_handler(handler, event)

            # After all handlers for this event type have successfully processed the event,
            # update the projection state.
            # This assumes the event has an `event_id` and `occurred_on` attribute.
            # You might need to adapt this based on your Event base class.
            projection_state = await self._projection_store.get_projection_state(
                self.projection_id
            )
            if projection_state is None:
                # This import is fine here as it's for a specific type
                from castlecraft_engineer.abstractions.projection import ProjectionState

                projection_state = ProjectionState(projection_id=self.projection_id)

            projection_state.update_progress(
                event_id=getattr(
                    event, "event_id", None
                ),  # Adapt if your Event has a different ID field
                event_timestamp=getattr(
                    event, "occurred_on", None
                ),  # Adapt for timestamp
            )
            await self._projection_store.save_projection_state(projection_state)

        except Exception as e:
            logger.error(
                f"Error handling event {event_type.__name__} in projector {self.projection_id}: {e}",
                exc_info=True,
            )
            # Decide on error handling strategy: re-raise, log and skip, move to dead-letter queue, etc.
            raise  # Re-raise by default to let the caller handle it

    async def _invoke_handler(
        self,
        handler: Union[
            Callable[[Event], None], Callable[[Event], Coroutine[Any, Any, None]]
        ],
        event: Event,
    ) -> None:
        """Helper to invoke a handler, allowing for async handlers in subclasses if needed."""
        # This basic implementation assumes synchronous handlers.
        # If your handlers can be async, this method (and `handle_event`) should be more sophisticated
        # or the projector itself should be designed for async handlers.
        # For now, let's assume the handler itself can be async.
        import inspect

        if inspect.iscoroutinefunction(handler):
            await handler(event)
        else:
            handler(event)  # type: ignore

    async def get_last_processed_event_details(
        self,
    ) -> tuple[Optional[Any], Optional[Any]]:
        """
        Retrieves the ID and timestamp of the last event processed by this projector.
        Useful for resuming event stream consumption.
        """
        state = await self._projection_store.get_projection_state(self.projection_id)
        if state:
            return state.last_processed_event_id, state.last_processed_event_timestamp
        return None, None

projection_id abstractmethod property

A unique identifier for this projector or the projection it manages. Used to store and retrieve its processing state.

get_last_processed_event_details() async

Retrieves the ID and timestamp of the last event processed by this projector. Useful for resuming event stream consumption.

Source code in src/castlecraft_engineer/abstractions/projector.py
async def get_last_processed_event_details(
    self,
) -> tuple[Optional[Any], Optional[Any]]:
    """
    Retrieves the ID and timestamp of the last event processed by this projector.
    Useful for resuming event stream consumption.
    """
    state = await self._projection_store.get_projection_state(self.projection_id)
    if state:
        return state.last_processed_event_id, state.last_processed_event_timestamp
    return None, None

handle_event(event) async

Handles an incoming event by dispatching it to registered handlers. This method should be called by the event stream consumer. It also updates the projection state after successful handling.

Source code in src/castlecraft_engineer/abstractions/projector.py
async def handle_event(self, event: Event) -> None:
    """
    Handles an incoming event by dispatching it to registered handlers.
    This method should be called by the event stream consumer.
    It also updates the projection state after successful handling.
    """
    event_type = type(event)
    handlers = self._event_handlers.get(event_type)

    if not handlers:
        # Optionally log or ignore events the projector is not interested in
        logger.debug(
            f"Projector {self.projection_id} has no handler for event type {event_type.__name__}"
        )
        return

    try:
        for handler in handlers:
            # Depending on whether handlers are async or sync, you might need to adjust.
            # For simplicity, assuming sync handlers here. If async, use `await handler(event)`.
            # If handlers can be async, this method should be `async def handle_event`.
            # For now, let's assume handlers are synchronous for simplicity in the ABC.
            # Concrete projectors can decide if their handlers are async.
            # If a handler is async, the `handle_event` method itself should be async.
            # Let's make it async to be more flexible.
            await self._invoke_handler(handler, event)

        # After all handlers for this event type have successfully processed the event,
        # update the projection state.
        # This assumes the event has an `event_id` and `occurred_on` attribute.
        # You might need to adapt this based on your Event base class.
        projection_state = await self._projection_store.get_projection_state(
            self.projection_id
        )
        if projection_state is None:
            # This import is fine here as it's for a specific type
            from castlecraft_engineer.abstractions.projection import ProjectionState

            projection_state = ProjectionState(projection_id=self.projection_id)

        projection_state.update_progress(
            event_id=getattr(
                event, "event_id", None
            ),  # Adapt if your Event has a different ID field
            event_timestamp=getattr(
                event, "occurred_on", None
            ),  # Adapt for timestamp
        )
        await self._projection_store.save_projection_state(projection_state)

    except Exception as e:
        logger.error(
            f"Error handling event {event_type.__name__} in projector {self.projection_id}: {e}",
            exc_info=True,
        )
        # Decide on error handling strategy: re-raise, log and skip, move to dead-letter queue, etc.
        raise  # Re-raise by default to let the caller handle it