Skip to content

Events, Event Handlers, and Event Buses

Events are a fundamental concept in Domain-Driven Design (DDD) and CQRS architectures. They represent significant occurrences within the domain, typically as a result of a command being successfully processed. castlecraft-engineer provides abstractions to define, handle, and dispatch events both internally and externally.

1. The Event Base Class

All domain events in your application should inherit from the Event base class, located in castlecraft_engineer.abstractions.event.Event.

  • Purpose: An Event is a data structure (often a dataclass) that captures the facts about something that has happened in the past. It's a message indicating a state change or a significant moment.
  • Characteristics:
    • Named in the past tense (e.g., UserRegisteredEvent, OrderShippedEvent).
    • Should be immutable once created.
    • Carry data relevant to the occurrence but no behavior.
# Example: Defining an Event
from dataclasses import dataclass
import uuid
from datetime import datetime
from castlecraft_engineer.abstractions.event import Event

@dataclass(frozen=True)
class OrderCreatedEvent(Event):
    order_id: uuid.UUID
    customer_id: uuid.UUID
    order_total: float
    created_at: datetime

2. The EventHandler Base Class

For events that require a reaction within the application, you define EventHandlers. These handlers subscribe to specific event types and perform actions when those events are published. They inherit from castlecraft_engineer.abstractions.event_handler.EventHandler[TEvent].

  • Purpose: To encapsulate the logic that responds to a specific domain event. This could involve updating a read model, sending a notification, triggering another command, or integrating with other parts of the system.
  • Key Method:
    • handle(self, event: TEvent) -> None (or async handle for asynchronous handlers): This method is called by the EventBus when an event of the subscribed type is published. It receives the event object and performs the necessary actions.
# Example: Defining an EventHandler
from castlecraft_engineer.abstractions.event_handler import EventHandler
# Assuming OrderCreatedEvent is defined as above

class SendOrderConfirmationEmailHandler(EventHandler[OrderCreatedEvent]):
    # In a real app, you might inject an email service, logger, etc.
    # def __init__(self, email_service: EmailService):
    #     self.email_service = email_service

    async def handle(self, event: OrderCreatedEvent) -> None:
        print(f"EVENT HANDLED: OrderCreatedEvent for order {event.order_id}")
        print(f"Simulating sending confirmation email to customer {event.customer_id}...")
        # await self.email_service.send_order_confirmation(event.customer_id, event.order_id)
        print(f"Confirmation email for order {event.order_id} would be sent.")

3. The EventBus (In-Process Eventing)

The EventBus (from castlecraft_engineer.abstractions.event_bus.EventBus) is responsible for managing subscriptions and dispatching events to their registered handlers within the same application process.

  • Purpose: To decouple event publishers from event subscribers, allowing for more modular and maintainable code.
  • Key Functionalities:
    • Handler Registration: Event Handlers are registered with the EventBus to listen for specific Event types. This is typically done at application startup, often via a Dependency Injection (DI) container.
      • castlecraft-engineer's EventBus (when used with ContainerBuilder from common/di.py) uses a method like builder.event_bus.subscribe(EventType, EventHandlerType).
    • Publishing Events:
      • publish(event: Event): Publishes a single event to all subscribed handlers.
      • publish_batch(events: List[Event]): Publishes a list of events. Each event is dispatched to its respective handlers.
    • Error Handling: If an attempt is made to register a handler for an event type that already has a handler registered (and the bus is configured for single handlers per event type, which is common for direct DI resolution), or if other registration issues occur, an EventHandlerRegistrationError (from abstractions/event_bus.py, re-exported in exc.py) might be raised.
# Example: Registering a handler and publishing an event via EventBus
import asyncio
import uuid
from datetime import datetime
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.event_bus import EventBus

# Assume OrderCreatedEvent and SendOrderConfirmationEmailHandler are defined as above

async def event_bus_example():
    # 1. Configure DI Container
    builder = ContainerBuilder()
    builder.with_event_bus() # Registers EventBus and allows subscription

    # Register the event handler
    # SendOrderConfirmationEmailHandler might have its own dependencies (e.g., EmailService)
    # builder.register(EmailService, ConcreteEmailService) # If needed
    builder.event_bus.subscribe(OrderCreatedEvent, SendOrderConfirmationEmailHandler)

    container = builder.build()

    # 2. Resolve EventBus
    event_bus = container.resolve(EventBus)

    # 3. Create and Publish an Event
    order_event = OrderCreatedEvent(
        order_id=uuid.uuid4(),
        customer_id=uuid.uuid4(),
        order_total=99.99,
        created_at=datetime.utcnow()
    )

    print(f"Publishing event: {order_event}")
    await event_bus.publish(order_event)

    # Example of publishing a batch
    # another_event = OrderCreatedEvent(...)
    # await event_bus.publish_batch([order_event, another_event])

if __name__ == "__main__":
    # To run this example, ensure classes are defined or imported correctly.
    # asyncio.run(event_bus_example())
    pass # Placeholder for running the example

4. Integrating Events with Aggregates

Aggregates are often the source of domain events. When an aggregate's state changes due to a command, it records one or more events.

from castlecraft_engineer.abstractions.aggregate import Aggregate, Event
# Assume OrderModel is a SQLModel for persistence
class OrderModel: pass

class Order(Aggregate[uuid.UUID, OrderModel]):
    def __init__(self, id: uuid.UUID, customer_id: uuid.UUID, version: int = 1):
        super().__init__(id, version)
        self.customer_id = customer_id
        self.total = 0.0
        # ... other properties

    @classmethod
    def create_new(cls, customer_id: uuid.UUID, order_total: float) -> "Order":
        order_id = uuid.uuid4()
        order = cls(id=order_id, customer_id=customer_id)
        order.total = order_total
        # Record an event about its creation
        order.record_event(
            OrderCreatedEvent(
                order_id=order.id,
                customer_id=order.customer_id,
                order_total=order.total,
                created_at=datetime.utcnow()
            )
        )
        return order

    # ... other methods that might record events

These recorded events are stored in the aggregate's uncommitted_events list. They are typically pulled using aggregate.pull_uncommitted_events() and published after the aggregate's state has been successfully persisted to the database. This is often managed by a Unit of Work pattern or within the Command Handler after a successful repository save and session commit.

# Snippet from a Command Handler or Application Service
# async def handle_create_order_command(command: CreateOrderCommand, ...):
    # ...
    # order = Order.create_new(command.customer_id, command.total)
    # await order_repository.save(order, session)
    # await session.commit() # Critical: commit before publishing events

    # uncommitted_events = order.pull_uncommitted_events()
    # if uncommitted_events:
    #     await event_bus.publish_batch(uncommitted_events)
    # ...

5. ExternalEventPublisher (Publishing to External Systems)

For scenarios where events need to be sent to external systems (e.g., message brokers like Kafka, RabbitMQ, or other microservices), castlecraft-engineer provides the ExternalEventPublisher abstract base class from castlecraft_engineer.abstractions.event_publisher.ExternalEventPublisher.

  • Purpose: To define a contract for publishing events to systems outside the current application process.
  • Abstract Methods:
    • async publish(self, event: Event) -> None: Publishes a single event to the external system.
    • async close(self) -> None: Allows for graceful shutdown and cleanup of resources used by the publisher (e.g., closing connections).
  • Implementation: You need to create concrete implementations of ExternalEventPublisher tailored to your specific messaging infrastructure.

This publisher is typically registered with the DI container and can be injected into event handlers that need to forward events externally.

6. EventStreamConsumer (Consuming from External Systems)

To consume events from external event streams and integrate them into your application, castlecraft-engineer provides the EventStreamConsumer abstract base class from castlecraft_engineer.abstractions.event_consumer.EventStreamConsumer.

  • Purpose: To listen to an external event source, deserialize incoming messages into domain events, and typically publish them to the internal EventBus for further processing by local handlers.
  • Abstract Methods:
    • async run(self) -> None: Intended to be a long-running method that continuously listens for and processes events from the external stream.
    • async shutdown(self) -> None: For gracefully stopping the consumer and releasing resources.
  • Implementation: You need to create concrete implementations specific to the external event source you are consuming from.

An EventStreamConsumer often takes the internal EventBus as a dependency to dispatch consumed events locally.

Conclusion

Events are a powerful mechanism for creating decoupled, reactive, and maintainable systems. castlecraft-engineer provides the necessary abstractions for both in-process event handling via the EventBus and for integrating with external eventing systems through ExternalEventPublisher and EventStreamConsumer. This allows for building robust applications that can effectively respond to and communicate changes in the domain.