Skip to content

Projectors and Read Models

In a CQRS and Event Sourcing architecture, the write side focuses on processing commands and persisting events. The read side, however, often requires data in a format optimized for querying and display. Projectors are components responsible for consuming events from the event stream and transforming them into these queryable read models (also known as projections).

What is a Projector?

A projector subscribes to a stream of Events and updates one or more read models based on the information contained in those events. Read models are typically denormalized views of data tailored for specific query needs, often stored in a separate database or data store optimized for reads (e.g., a NoSQL database, a search index, or even simple database tables).

Castlecraft Engineer provides an abstract base class Projector to help build these components.

Key Responsibilities of a Projector

  • Event Handling: Listen to specific domain events.
  • Data Transformation: Extract relevant data from events.
  • Read Model Updates: Create, update, or delete records in the read model store(s).
  • Idempotency: Projectors should ideally be idempotent. If they process the same event multiple times, the outcome on the read model should be the same as if it were processed once. This is important for resilience and replayability.
  • State Management: Projectors often need to track their progress (e.g., the last event ID or timestamp processed) to resume processing after a restart. The ProjectionStore and ProjectionState abstractions help with this.

The Projector Abstraction

The castlecraft_engineer.abstractions.projector.Projector class provides a base for your projectors: * projection_id (abstract property): A unique string identifier for the projector. This ID is used with a ProjectionStore to save and retrieve its processing state. * _register_event_handlers() (abstract method): Subclasses must implement this to define which events they are interested in and which methods should handle them, using self._register_handler(EventType, self._handle_event_type). * handle_event(event: Event) (async method): This method is called by an event stream consumer (e.g., a background worker subscribing to an event bus or message queue). It dispatches the event to the appropriate registered handler(s). After successful processing, it updates the projector's state in the ProjectionStore. * _projection_store: An instance of ProjectionStore passed during initialization, used to manage the projector's progress.

Example of a simple Projector:

from castlecraft_engineer.abstractions.projector import Projector, ProjectionId
from castlecraft_engineer.abstractions.projection_store import ProjectionStore
from castlecraft_engineer.abstractions.event import Event
# Assuming OrderCreated and OrderShipped are Event classes

class OrderSummaryProjector(Projector):
    def __init__(self, projection_store: ProjectionStore, read_model_db_conn):
        super().__init__(projection_store)
        self.db = read_model_db_conn # Your read model database connection/client

    @property
    def projection_id(self) -> ProjectionId:
        return "order_summary_v1"

    def _register_event_handlers(self) -> None:
        self._register_handler(OrderCreated, self._on_order_created)
        self._register_handler(OrderShipped, self._on_order_shipped)

    async def _on_order_created(self, event: OrderCreated) -> None:
        # Logic to create a new record in the order summary read model
        # await self.db.execute_query(
        #    "INSERT INTO order_summaries (id, customer_name, status) VALUES (?, ?, ?)",
        #    event.order_id, event.customer_name, "PENDING"
        # )
        print(f"Projector: Order {event.order_id} created.")

    async def _on_order_shipped(self, event: OrderShipped) -> None:
        # Logic to update the status in the order summary read model
        # await self.db.execute_query(
        #    "UPDATE order_summaries SET status = ? WHERE id = ?",
        #    "SHIPPED", event.order_id
        # )
        print(f"Projector: Order {event.order_id} shipped.")

ProjectionState and ProjectionStore

To enable projectors to resume processing from where they left off (e.g., after a service restart or for incremental updates), their progress needs to be tracked. * ProjectionState: A dataclass (castlecraft_engineer.abstractions.projection.ProjectionState) that typically stores projection_id, last_processed_event_id, and last_processed_event_timestamp. * ProjectionStore: An abstraction (castlecraft_engineer.abstractions.projection_store.ProjectionStore) for saving and retrieving ProjectionState. Implementations could use databases, Redis, or other persistent stores.

The Projector base class automatically updates the ProjectionState in its configured ProjectionStore after successfully handling an event.

Consuming Events

Projectors are typically driven by an event stream. This could be: * An in-process EventBus for local projections. * A subscription to an external message broker (like Kafka, RabbitMQ) via an EventStreamConsumer. * A batch process that periodically polls the EventStore for new events (less common for real-time updates but possible).

The consuming mechanism would fetch events and pass them to the projector's handle_event method.

Benefits of Projectors

  • Optimized Queries: Read models can be tailored for specific query patterns, leading to faster query performance.
  • Decoupling: Separates read concerns from write concerns, aligning with CQRS.
  • Scalability: Read models can be scaled independently of the write-side system.
  • Flexibility: Allows for multiple, different read models to be generated from the same event stream to serve various application needs or UI views.

Considerations

  • Eventual Consistency: Read models are updated asynchronously as events are processed. This means there can be a slight delay between when a command is executed (and an event is generated) and when the read model reflects that change.
  • Data Duplication: Read models often involve denormalizing and duplicating data, which is a trade-off for query performance.
  • Replaying Projections: If a projector's logic changes or a bug is fixed, you might need to rebuild the read model by replaying events from a certain point (or from the beginning). Having a ProjectionStore helps manage the starting point for such replays.
  • Error Handling: Robust error handling is needed within projectors. What happens if a projector fails to process an event? Strategies include retries, dead-letter queues, or manual intervention.

API Reference

For detailed API information, refer to: * Projector API Reference * ProjectionState API Reference * ProjectionStore API Reference