Skip to content

Sagas: Managing Distributed Transactions

Sagas are a crucial design pattern for managing data consistency across multiple services or aggregates in a distributed system, especially when long-lived transactions (LLTs) are involved. They provide an alternative to traditional distributed transactions (like two-phase commit - 2PC) which can be complex and reduce system availability.

What is a Saga?

A Saga is a sequence of local transactions. Each local transaction updates data within a single service/aggregate and publishes an event. This event then triggers the next local transaction in the Saga. If any local transaction fails, the Saga executes compensating transactions to undo the work of preceding successful local transactions, thus maintaining data consistency eventually.

Why Use Sagas?

  • Decentralized Control: Avoids the need for a central transaction coordinator that locks resources across services.
  • Improved Availability: Services can operate more independently. The failure of one service in a saga step doesn't necessarily bring down the entire system; compensation logic can handle it.
  • Loose Coupling: Services communicate via events, reducing direct dependencies.
  • Scalability: Individual services involved in a saga can be scaled independently.

Challenges with Sagas

  • Complexity: Implementing sagas, especially compensation logic, can be complex.
  • Eventual Consistency: Data is only consistent after the saga completes or fully compensates. This might not be suitable for all use cases.
  • Debugging and Testing: Tracing a transaction across multiple services and testing all failure scenarios can be challenging.
  • Idempotency: All steps (and compensations) must be idempotent to handle message redelivery.

Types of Sagas

There are two primary ways to coordinate sagas:

1. Choreography-based Sagas

  • How it works: Each service participating in the saga subscribes to events from other services and knows when to execute its part of the transaction. There's no central coordinator; services react to events they are interested in.
  • Pros:
    • Simple for sagas involving a small number of participants.
    • Truly decentralized, no single point of failure for coordination.
  • Cons:
    • Can become hard to track the overall state of the saga as the number of participants grows.
    • Risk of cyclic dependencies between services.
    • Distributed debugging is more complex.
    • Adding new steps can require changes in multiple services.

2. Orchestration-based Sagas

  • How it works: A central orchestrator (the Saga Manager or Saga Orchestrator) tells the participating services what local transactions to execute. The orchestrator manages the entire sequence, sending commands to services and reacting to their outcome events.
  • Pros:
    • Centralized logic makes the saga flow explicit and easier to understand and manage.
    • Easier to track the state of the saga.
    • Simpler to implement complex sagas with many participants or conditional flows.
    • Compensation logic is centralized in the orchestrator.
  • Cons:
    • The orchestrator can become a single point of failure (though it can be made highly available).
    • Risk of the orchestrator becoming a "god object" if it handles too many sagas or too much logic.

Implementing Sagas in castlecraft-engineer

The castlecraft-engineer framework provides a solid foundation for implementing sagas, particularly the orchestration-based approach, due to its CQRS and event-driven architecture.

Key Framework Components to Leverage:

  • Commands and Events (abstractions/command.py, abstractions/event.py): The messages that drive the saga steps.
  • Commands and CommandBus (abstractions/command.py, abstractions/command_bus.py): Sagas dispatch commands to initiate local transactions in different aggregates or services.
  • Events, EventBus, and EventHandlers (abstractions/event.py, abstractions/event_bus.py, abstractions/event_handler.py): Events signal the completion (or failure) of a local transaction and trigger the next step in the saga or a compensation action.
  • Aggregates and Repositories (abstractions/aggregate.py, abstractions/repository.py): Sagas interact with aggregates. You'll also need to persist the state of the saga itself.
  • Dependency Injection (common/di.py): Useful for wiring up saga components.
  • External Event Publisher/Consumer (abstractions/event_publisher.py, abstractions/event_consumer.py): Essential if your saga spans across different microservices that communicate via an external message broker.

Orchestration Approach Example

This is often the recommended approach for its clarity and control.

1. Saga Orchestrator/Manager

  • A dedicated class responsible for defining the steps of the saga.
  • Often implemented as an EventHandler that listens for the initial event that starts the saga.
  • Uses the CommandBus to send commands to other services/aggregates.
  • Uses a repository to load and save the saga's state.

Example: Basic Orchestrator Structure

# In your application's sagas module (e.g., src/your_app/sagas/order_saga.py)

import logging
from typing import Dict, Any, Optional
from uuid import UUID, uuid4 # Ensure uuid4 is imported
from dataclasses import dataclass, field # Added for placeholder definitions

from castlecraft_engineer.abstractions.command_bus import CommandBus
from castlecraft_engineer.abstractions.event_bus import EventBus # Added for event_bus injection
from castlecraft_engineer.abstractions.event_handler import EventHandler
from castlecraft_engineer.database.repository import AsyncModelRepository # Or your custom SagaRepository
from castlecraft_engineer.abstractions.event import Event # Base Event class
from castlecraft_engineer.abstractions.command import Command as BaseCommand # Added for placeholder definitions

# --- Assume these Command/Event/StateModel classes are defined elsewhere and imported ---
# from your_app.commands import ProcessPaymentCommand, ReserveInventoryCommand, RefundPaymentCommand
# from your_app.events import OrderCreatedEvent, PaymentProcessedEvent, PaymentFailedEvent, InventoryReservedEvent, InventoryReservationFailedEvent
# from your_app.sagas.saga_state_model import SagaStateModel # Your SQLModel for saga state
# --- Placeholder definitions for the example to be self-contained for docs ---
# These are simplified placeholders. In a real app, they'd be fully defined.
class SagaStateModel: "Placeholder for SQLModel representing saga state"; pass

@dataclass(frozen=True, kw_only=True)
class OrderCreatedEvent(Event):
    order_id: UUID = field(default_factory=uuid4)
    amount: float = 0.0

@dataclass(frozen=True, kw_only=True)
class PaymentProcessedEvent(Event):
    order_id: UUID = field(default_factory=uuid4)
    payment_id: UUID = field(default_factory=uuid4)

@dataclass(frozen=True, kw_only=True)
class PaymentFailedEvent(Event):
    order_id: UUID = field(default_factory=uuid4)
    reason: str = ""

@dataclass(frozen=True)
class ProcessPaymentCommand(BaseCommand):
    order_id: UUID
    amount: float

@dataclass(frozen=True)
class ReserveInventoryCommand(BaseCommand):
    order_id: UUID
    items: list # Example: [{"product_id": "...", "quantity": 1}]

@dataclass(frozen=True)
class RefundPaymentCommand(BaseCommand):
    payment_id: UUID
# --- End Placeholder definitions ---

class OrderProcessingSagaOrchestrator(EventHandler[Event]): # Can listen to a base Event or specific ones
    def __init__(
        self,
        command_bus: CommandBus,
        event_bus: EventBus, # Saga orchestrators often need to publish events too
        saga_repository: AsyncModelRepository[SagaStateModel], # Or specific SagaRepository
        # Inject other dependencies needed by the saga steps
    ):
        self._command_bus = command_bus
        self._event_bus = event_bus
        self._saga_repository = saga_repository
        self._logger = logging.getLogger(self.__class__.__name__)

    async def handle(self, event: Event) -> None:
        self._logger.info(f"Saga Orchestrator received event: {type(event).__name__}")
        if isinstance(event, OrderCreatedEvent):
            await self._handle_order_created(event)
        elif isinstance(event, PaymentProcessedEvent):
            await self._handle_payment_processed(event)
        elif isinstance(event, PaymentFailedEvent):
            await self._handle_payment_failed(event)
        else:
            self._logger.debug(f"Ignoring event {type(event).__name__} not relevant to this orchestrator.")

    async def _handle_order_created(self, event: OrderCreatedEvent):
        self._logger.info(f"Handling OrderCreatedEvent for Order ID: {event.order_id}")
        # 1. Create and persist saga state (simplified, actual persistence via UoW)
        # Example: saga_state = SagaStateModel(
        #     id=uuid4(), saga_type="OrderProcessingSaga", current_step="AWAITING_PAYMENT",
        #     payload={"order_id": str(event.order_id), "amount": event.amount},
        #     compensation_data={}, status="PENDING"
        # )
        # await self._saga_repository.save(session, saga_state) # type: ignore # Note: 'session' handling is simplified in this example.
        # 2. Dispatch first command
        process_payment_command = ProcessPaymentCommand(order_id=event.order_id, amount=event.amount)
        await self._command_bus.execute(process_payment_command)
        self._logger.info(f"Initiated payment processing for order {event.order_id}.")

  • Dispatches commands for each step.
  • Handles outcome events (success or failure) to decide whether to proceed or compensate.
  • Can be implemented as an EventHandler itself (listening for events that initiate or progress the saga) or as a separate service invoked by an initial event.

2. Persisting Saga State

  • Each saga instance needs to maintain its state (e.g., STARTED, PROCESSING_PAYMENT, AWAITING_INVENTORY_CONFIRMATION, COMPENSATING_PAYMENT, COMPLETED, FAILED).
  • This state, along with any data required for compensation (e.g., order ID, payment transaction ID), must be persisted.
  • You can create a new SQLModel for saga state (e.g., SagaStateModel) and use a repository (e.g., AsyncModelRepository) to manage its persistence.
  • Example SagaStateModel fields:
    from sqlmodel import SQLModel, Field
    from typing import Dict, Any
    import uuid
    
    class SagaStateModel(SQLModel, table=True):
        id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
        saga_type: str  # e.g., "OrderProcessingSaga"
        current_step: str
        payload: Dict[str, Any]  # Data carried through the saga
        compensation_data: Dict[str, Any]  # Data needed for compensation
        status: str  # PENDING, COMPLETED, FAILED, COMPENSATING
        version: int = Field(default=0) # For optimistic concurrency
    

3. Saga Steps and Event Handling

  • Initiation: An initial event (e.g., OrderSubmittedEvent) triggers the saga orchestrator.
    • The orchestrator creates and persists a new SagaStateModel instance.
    • It dispatches the first command of the saga (e.g., ProcessPaymentCommand) using the CommandBus.
    • Updates saga state (e.g., current_step="AWAITING_PAYMENT").
  • Progression: When an event indicating the outcome of a step arrives (e.g., PaymentProcessedEvent or PaymentFailedEvent):
    • The orchestrator loads the SagaStateModel.
    • If successful, it updates the saga state and dispatches the command for the next step.
    • If failed, it updates the saga state and starts dispatching compensation commands.

Example: Handling Step Completion

# Inside OrderProcessingSagaOrchestrator (continued)

async def _handle_payment_processed(self, event: PaymentProcessedEvent):
    self._logger.info(f"Handling PaymentProcessedEvent for Order ID: {event.order_id}")
    # 1. Load saga state (simplified - assumes a method to find saga by a correlation ID)
    # saga_state: Optional[SagaStateModel] = await self._saga_repository.find_saga_by_order_id(event.order_id) # type: ignore

    # if not saga_state or saga_state.current_step != "AWAITING_PAYMENT":
    #     self._logger.warning(f"Saga state not found or in unexpected step for order {event.order_id}. Ignoring event.")
    #     return

    # 2. Update saga state
    # saga_state.current_step = "AWAITING_INVENTORY"
    # saga_state.payload["payment_id"] = str(event.payment_id)
    # await self._saga_repository.save(session, saga_state) # type: ignore

    # 3. Dispatch the next command
    # order_items = saga_state.payload.get("items", []) # Example: get items from payload
    # reserve_command = ReserveInventoryCommand(order_id=event.order_id, items=order_items)
    # await self._command_bus.execute(reserve_command) # type: ignore

    self._logger.info(f"Dispatched ReserveInventoryCommand for order {event.order_id}.")

Example: Handling Step Failure and Compensation

# Inside OrderProcessingSagaOrchestrator (continued)

async def _handle_payment_failed(self, event: PaymentFailedEvent):
    self._logger.info(f"Handling PaymentFailedEvent for Order ID: {event.order_id}")
    # 1. Load saga state (simplified)
    # saga_state: Optional[SagaStateModel] = await self._saga_repository.find_saga_by_order_id(event.order_id) # type: ignore

    # if not saga_state or saga_state.current_step != "AWAITING_PAYMENT":
    #     self._logger.warning(f"Saga state not found or in unexpected step for order {event.order_id}. Ignoring event.")
    #     return

    # 2. Update saga state to FAILED or COMPENSATING
    # saga_state.status = "FAILED"
    # saga_state.current_step = "PAYMENT_FAILED"
    # await self._saga_repository.save(session, saga_state) # type: ignore

    # 3. Compensation logic (if previous steps were successful)
    #    Example: if saga_state.payload.get("previous_step_compensated_data"):
    #        refund_command = RefundPaymentCommand(payment_id=saga_state.payload["payment_id"])
    #        await self._command_bus.execute(refund_command) # type: ignore

    self._logger.warning(f"Order processing failed for order {event.order_id} due to payment failure.")
    # Optionally publish a saga failure event
    # from your_app.events import OrderProcessingFailedEvent
    # await self._event_bus.publish(OrderProcessingFailedEvent(order_id=event.order_id)) # type: ignore

Conceptual Flow (Recap)

  1. Initiation: An event (e.g., OrderCreatedEvent) starts the saga.
  2. Orchestrator: Creates saga state, dispatches the first command (e.g., ProcessPaymentCommand).
  3. Service: Executes local transaction, publishes outcome event (e.g., PaymentProcessedEvent or PaymentFailedEvent).
  4. Orchestrator: Handles outcome. If success, proceeds to next step (e.g., dispatch ReserveInventoryCommand). If failure, initiates compensation.
  5. Compensation: Orchestrator dispatches compensation commands in reverse order of successful steps (e.g., RefundPaymentCommand).

4. Compensation Logic

  • For each step that can be compensated, define a corresponding compensation command (e.g., RefundPaymentCommand to compensate ProcessPaymentCommand).
  • The saga orchestrator must know which compensation command to issue for each completed step, typically in reverse order.
  • Store necessary data for compensation (e.g., transaction IDs) in the SagaStateModel.compensation_data.

5. Idempotency

  • Ensure that command handlers and event handlers involved in the saga are idempotent. This is critical because in a distributed system, messages (commands or events) might be redelivered.
  • The saga orchestrator should also be idempotent in its state transitions and command dispatches.

Example Saga Flow: Order Processing

  1. Client: Submits CreateOrderCommand.
  2. Order Service: Handles command, creates order, publishes OrderCreatedEvent.
  3. OrderProcessingSagaOrchestrator: Handles OrderCreatedEvent.
    • Creates Saga state (Status: PENDING, Step: AWAITING_PAYMENT).
    • Dispatches ProcessPaymentCommand(order_id, amount).
  4. Payment Service: Handles ProcessPaymentCommand.
    • If successful, publishes PaymentProcessedEvent(order_id, payment_id).
    • If failed, publishes PaymentFailedEvent(order_id, reason).
  5. OrderProcessingSagaOrchestrator: Handles payment outcome event.
    • On PaymentProcessedEvent:
      • Updates Saga state (Step: AWAITING_INVENTORY, add payment_id to compensation data).
      • Dispatches ReserveInventoryCommand(order_id, items).
    • On PaymentFailedEvent:
      • Updates Saga state (Status: FAILED, Step: PAYMENT_FAILED).
      • (No compensation needed yet for this step if it's the first real action).
      • Publishes OrderProcessingFailedEvent.
  6. Inventory Service: Handles ReserveInventoryCommand.
    • If successful, publishes InventoryReservedEvent(order_id).
    • If failed, publishes InventoryReservationFailedEvent(order_id, reason).
  7. OrderProcessingSagaOrchestrator: Handles inventory outcome event.
    • On InventoryReservedEvent:
      • Updates Saga state (Status: COMPLETED, Step: INVENTORY_RESERVED).
      • Publishes OrderProcessingCompletedEvent.
    • On InventoryReservationFailedEvent:
      • Updates Saga state (Status: COMPENSATING, Step: INVENTORY_FAILED).
      • Dispatches RefundPaymentCommand(order_id, payment_id) (using data from saga state).
  8. Payment Service: Handles RefundPaymentCommand.
    • Publishes PaymentRefundedEvent or PaymentRefundFailedEvent.
  9. OrderProcessingSagaOrchestrator: Handles refund outcome event.
    • On PaymentRefundedEvent:
      • Updates Saga state (Status: FAILED, Step: PAYMENT_REFUNDED_COMPENSATION_COMPLETED).
      • Publishes OrderProcessingFailedEvent.
    • On PaymentRefundFailedEvent:
      • Updates Saga state (Status: COMPENSATION_FAILED). Critical error, requires manual intervention.

Suggested Directory Structure

You might consider a dedicated sagas module within your application or framework:

src/your_app_or_castlecraft_engineer/
├── ...
├── sagas/
│   ├── __init__.py
│   ├── base_saga_orchestrator.py # Abstract base for orchestrators
│   ├── saga_state_model.py       # SQLModel for saga state
│   ├── saga_repository.py        # Repository for SagaStateModel
│   └── exceptions.py             # Saga specific exceptions
└── ...
Or, if sagas are highly domain-specific, they might live within your application's domain modules.

Code Examples: Commands and Events

Considerations for Sagas in castlecraft-engineer

  • In-Process vs. Distributed Sagas:
    • In-Process: If all aggregates/services involved are within the same application process, the framework's internal EventBus and CommandBus can be used directly.
    • Distributed: If the saga spans microservices, you'll need to leverage ExternalEventPublisher to send events/commands to a message broker (e.g., Kafka, RabbitMQ) and EventStreamConsumer (or similar) in other services (or the saga orchestrator if it's a separate service) to consume these messages. The orchestrator would then use its local CommandBus to execute local transactions based on consumed events.
  • Atomicity of Saga State Updates: Ensure that updating the saga's state and dispatching the next command/event happen atomically or in a way that's resilient to failures. The version field in your SagaStateModel can be used for optimistic concurrency control.
  • Error Handling and Retries: Implement robust error handling. For transient failures, consider retry mechanisms. For persistent failures, the saga should move to a failed or compensating state.
  • Testing Sagas: Testing sagas involves verifying both the "happy path" and various failure/compensation scenarios. This can involve unit tests for individual saga steps and integration tests for the overall flow.
  • Observability: Implement comprehensive logging and tracing across saga steps. Distributed tracing tools are invaluable.

Here are examples of the Commands and Events that would drive the Order Processing Saga:

Commands

# In your application's commands module (e.g., src/your_app/commands.py)

from dataclasses import dataclass
from uuid import UUID
from typing import List, Dict, Any
from castlecraft_engineer.abstractions.command import Command

@dataclass(frozen=True)
class ProcessPaymentCommand(Command):
    order_id: UUID
    amount: float

@dataclass(frozen=True)
class ReserveInventoryCommand(Command):
    order_id: UUID
    items: List[Dict[str, Any]] # Simplified; could be a list of Value Objects

@dataclass(frozen=True)
class RefundPaymentCommand(Command):
    order_id: UUID # May not be needed if payment_id is globally unique and sufficient
    payment_id: UUID

Events

# In your application's events module (e.g., src/your_app/events.py)

from dataclasses import dataclass
from uuid import UUID
from castlecraft_engineer.abstractions.event import Event

@dataclass(frozen=True, kw_only=True)
class OrderCreatedEvent(Event):
    order_id: UUID
    amount: float
    # Include other relevant order details

@dataclass(frozen=True, kw_only=True)
class PaymentProcessedEvent(Event):
    order_id: UUID
    payment_id: UUID

@dataclass(frozen=True, kw_only=True)
class PaymentFailedEvent(Event):
    order_id: UUID
    reason: str

# ... other saga events like InventoryReservedEvent, InventoryReservationFailedEvent, etc.

When to Use Sagas (and When Not To)

Use Sagas When:

  • You need to maintain data consistency across multiple services or aggregates in a distributed environment.
  • A transaction is long-lived and holding locks across services (as in 2PC) would reduce availability.
  • Eventual consistency is an acceptable trade-off for the business process.
  • You want to achieve loose coupling between services.

Avoid Sagas When:

  • Strong, immediate consistency (ACID properties) across all operations is a strict requirement.
  • The transaction is simple and confined to a single service or aggregate.
  • The complexity of implementing and managing sagas outweighs the benefits for a particular use case.

By carefully considering these aspects, you can effectively leverage the Saga pattern within the castlecraft-engineer framework to build robust and resilient distributed applications.