Events, Event Handlers, and Event Buses¶
Events are a fundamental concept in Domain-Driven Design (DDD) and CQRS architectures. They represent significant occurrences within the domain, typically as a result of a command being successfully processed. castlecraft-engineer
provides abstractions to define, handle, and dispatch events both internally and externally.
1. The Event
Base Class¶
All domain events in your application should inherit from the Event
base class, located in castlecraft_engineer.abstractions.event.Event
.
- Purpose: An
Event
is a data structure (often a dataclass) that captures the facts about something that has happened in the past. It's a message indicating a state change or a significant moment. - Characteristics:
- Named in the past tense (e.g.,
UserRegisteredEvent
,OrderShippedEvent
). - Should be immutable once created.
- Carry data relevant to the occurrence but no behavior.
- Named in the past tense (e.g.,
# Example: Defining an Event
from dataclasses import dataclass
import uuid
from datetime import datetime
from castlecraft_engineer.abstractions.event import Event
@dataclass(frozen=True)
class OrderCreatedEvent(Event):
order_id: uuid.UUID
customer_id: uuid.UUID
order_total: float
created_at: datetime
2. The EventHandler
Base Class¶
For events that require a reaction within the application, you define EventHandler
s. These handlers subscribe to specific event types and perform actions when those events are published. They inherit from castlecraft_engineer.abstractions.event_handler.EventHandler[TEvent]
.
- Purpose: To encapsulate the logic that responds to a specific domain event. This could involve updating a read model, sending a notification, triggering another command, or integrating with other parts of the system.
- Key Method:
handle(self, event: TEvent) -> None
(orasync handle
for asynchronous handlers): This method is called by theEventBus
when an event of the subscribed type is published. It receives the event object and performs the necessary actions.
# Example: Defining an EventHandler
from castlecraft_engineer.abstractions.event_handler import EventHandler
# Assuming OrderCreatedEvent is defined as above
class SendOrderConfirmationEmailHandler(EventHandler[OrderCreatedEvent]):
# In a real app, you might inject an email service, logger, etc.
# def __init__(self, email_service: EmailService):
# self.email_service = email_service
async def handle(self, event: OrderCreatedEvent) -> None:
print(f"EVENT HANDLED: OrderCreatedEvent for order {event.order_id}")
print(f"Simulating sending confirmation email to customer {event.customer_id}...")
# await self.email_service.send_order_confirmation(event.customer_id, event.order_id)
print(f"Confirmation email for order {event.order_id} would be sent.")
3. The EventBus
(In-Process Eventing)¶
The EventBus
(from castlecraft_engineer.abstractions.event_bus.EventBus
) is responsible for managing subscriptions and dispatching events to their registered handlers within the same application process.
- Purpose: To decouple event publishers from event subscribers, allowing for more modular and maintainable code.
- Key Functionalities:
- Handler Registration: Event Handlers are registered with the
EventBus
to listen for specificEvent
types. This is typically done at application startup, often via a Dependency Injection (DI) container.castlecraft-engineer
'sEventBus
(when used withContainerBuilder
fromcommon/di.py
) uses a method likebuilder.event_bus.subscribe(EventType, EventHandlerType)
.
- Publishing Events:
publish(event: Event)
: Publishes a single event to all subscribed handlers.publish_batch(events: List[Event])
: Publishes a list of events. Each event is dispatched to its respective handlers.
- Error Handling: If an attempt is made to register a handler for an event type that already has a handler registered (and the bus is configured for single handlers per event type, which is common for direct DI resolution), or if other registration issues occur, an
EventHandlerRegistrationError
(fromabstractions/event_bus.py
, re-exported inexc.py
) might be raised.
- Handler Registration: Event Handlers are registered with the
# Example: Registering a handler and publishing an event via EventBus
import asyncio
import uuid
from datetime import datetime
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.event_bus import EventBus
# Assume OrderCreatedEvent and SendOrderConfirmationEmailHandler are defined as above
async def event_bus_example():
# 1. Configure DI Container
builder = ContainerBuilder()
builder.with_event_bus() # Registers EventBus and allows subscription
# Register the event handler
# SendOrderConfirmationEmailHandler might have its own dependencies (e.g., EmailService)
# builder.register(EmailService, ConcreteEmailService) # If needed
builder.event_bus.subscribe(OrderCreatedEvent, SendOrderConfirmationEmailHandler)
container = builder.build()
# 2. Resolve EventBus
event_bus = container.resolve(EventBus)
# 3. Create and Publish an Event
order_event = OrderCreatedEvent(
order_id=uuid.uuid4(),
customer_id=uuid.uuid4(),
order_total=99.99,
created_at=datetime.utcnow()
)
print(f"Publishing event: {order_event}")
await event_bus.publish(order_event)
# Example of publishing a batch
# another_event = OrderCreatedEvent(...)
# await event_bus.publish_batch([order_event, another_event])
if __name__ == "__main__":
# To run this example, ensure classes are defined or imported correctly.
# asyncio.run(event_bus_example())
pass # Placeholder for running the example
4. Integrating Events with Aggregates¶
Aggregates are often the source of domain events. When an aggregate's state changes due to a command, it records one or more events.
from castlecraft_engineer.abstractions.aggregate import Aggregate, Event
# Assume OrderModel is a SQLModel for persistence
class OrderModel: pass
class Order(Aggregate[uuid.UUID, OrderModel]):
def __init__(self, id: uuid.UUID, customer_id: uuid.UUID, version: int = 1):
super().__init__(id, version)
self.customer_id = customer_id
self.total = 0.0
# ... other properties
@classmethod
def create_new(cls, customer_id: uuid.UUID, order_total: float) -> "Order":
order_id = uuid.uuid4()
order = cls(id=order_id, customer_id=customer_id)
order.total = order_total
# Record an event about its creation
order.record_event(
OrderCreatedEvent(
order_id=order.id,
customer_id=order.customer_id,
order_total=order.total,
created_at=datetime.utcnow()
)
)
return order
# ... other methods that might record events
These recorded events are stored in the aggregate's uncommitted_events
list. They are typically pulled using aggregate.pull_uncommitted_events()
and published after the aggregate's state has been successfully persisted to the database. This is often managed by a Unit of Work pattern or within the Command Handler after a successful repository save and session commit.
# Snippet from a Command Handler or Application Service
# async def handle_create_order_command(command: CreateOrderCommand, ...):
# ...
# order = Order.create_new(command.customer_id, command.total)
# await order_repository.save(order, session)
# await session.commit() # Critical: commit before publishing events
# uncommitted_events = order.pull_uncommitted_events()
# if uncommitted_events:
# await event_bus.publish_batch(uncommitted_events)
# ...
5. ExternalEventPublisher
(Publishing to External Systems)¶
For scenarios where events need to be sent to external systems (e.g., message brokers like Kafka, RabbitMQ, or other microservices), castlecraft-engineer
provides the ExternalEventPublisher
abstract base class from castlecraft_engineer.abstractions.event_publisher.ExternalEventPublisher
.
- Purpose: To define a contract for publishing events to systems outside the current application process.
- Abstract Methods:
async publish(self, event: Event) -> None
: Publishes a single event to the external system.async close(self) -> None
: Allows for graceful shutdown and cleanup of resources used by the publisher (e.g., closing connections).
- Implementation: You need to create concrete implementations of
ExternalEventPublisher
tailored to your specific messaging infrastructure.
This publisher is typically registered with the DI container and can be injected into event handlers that need to forward events externally.
6. EventStreamConsumer
(Consuming from External Systems)¶
To consume events from external event streams and integrate them into your application, castlecraft-engineer
provides the EventStreamConsumer
abstract base class from castlecraft_engineer.abstractions.event_consumer.EventStreamConsumer
.
- Purpose: To listen to an external event source, deserialize incoming messages into domain events, and typically publish them to the internal
EventBus
for further processing by local handlers. - Abstract Methods:
async run(self) -> None
: Intended to be a long-running method that continuously listens for and processes events from the external stream.async shutdown(self) -> None
: For gracefully stopping the consumer and releasing resources.
- Implementation: You need to create concrete implementations specific to the external event source you are consuming from.
An EventStreamConsumer
often takes the internal EventBus
as a dependency to dispatch consumed events locally.
Conclusion¶
Events are a powerful mechanism for creating decoupled, reactive, and maintainable systems. castlecraft-engineer
provides the necessary abstractions for both in-process event handling via the EventBus
and for integrating with external eventing systems through ExternalEventPublisher
and EventStreamConsumer
. This allows for building robust applications that can effectively respond to and communicate changes in the domain.