Skip to content

castlecraft_engineer.abstractions

castlecraft_engineer.abstractions

Aggregate

Bases: Generic[TAggregateId], ABC

Base class for Aggregate Roots.

Source code in src/castlecraft_engineer/abstractions/aggregate.py
class Aggregate(Generic[TAggregateId], abc.ABC):
    """Base class for Aggregate Roots."""

    def __init__(self, id: TAggregateId, version: int = -1) -> None:
        if id is None:
            raise ValueError("Aggregate ID required.")
        self._id: TAggregateId = id
        # version for optimistic concurrency
        self._version: int = version
        self._uncommitted_events: List[Event] = []

    @property
    def id(self) -> TAggregateId:
        return self._id

    @property
    def version(self) -> int:
        """Current version for optimistic concurrency."""
        return self._version

    @property
    def uncommitted_events(self) -> List[Event]:
        """Events recorded since last save."""
        return self._uncommitted_events

    def _record_event(self, event: Event) -> None:
        """Records a domain event."""
        if not isinstance(event, Event):
            raise TypeError("Can only record Event instances.")
        self._uncommitted_events.append(event)

    def pull_uncommitted_events(self) -> List[Event]:
        """Retrieves and clears uncommitted events."""
        events = self._uncommitted_events[:]
        self._uncommitted_events.clear()
        return events

    def _increment_version(self) -> None:
        self._version += 1

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, type(self)):
            return NotImplemented
        return self.id == other.id

    def __hash__(self) -> int:
        return hash(self.id)

    def __repr__(self) -> str:
        return f"<{self.__class__.__name__}(id={self.id!r}, version={self.version})>"  # noqa: E501

uncommitted_events property

Events recorded since last save.

version property

Current version for optimistic concurrency.

pull_uncommitted_events()

Retrieves and clears uncommitted events.

Source code in src/castlecraft_engineer/abstractions/aggregate.py
def pull_uncommitted_events(self) -> List[Event]:
    """Retrieves and clears uncommitted events."""
    events = self._uncommitted_events[:]
    self._uncommitted_events.clear()
    return events

AggregateRepository

Bases: _RepositoryBase[TAggregateId, TAggregate, TSQLModel]

Generic repository mapping Aggregates to SQLModels.

Handles persistence for Aggregates, potentially spanning multiple related SQLModels if relationships and cascades are configured correctly on the SQLModel classes themselves.

Relies on the SQLAlchemy Unit of Work pattern managed outside the repository (e.g., in a Command Handler or Application Service).

Source code in src/castlecraft_engineer/abstractions/repository.py
class AggregateRepository(
    _RepositoryBase[
        TAggregateId,
        TAggregate,
        TSQLModel,
    ],
):
    """
    Generic repository mapping Aggregates to SQLModels.

    Handles persistence for Aggregates, potentially spanning multiple
    related SQLModels if relationships and cascades are configured
    correctly on the SQLModel classes themselves.

    Relies on the SQLAlchemy Unit of Work pattern managed outside
    the repository (e.g., in a Command Handler or Application Service).
    """

    def __init__(
        self,
        aggregate_cls: Type[TAggregate],
        model_cls: Type[TSQLModel],
    ):
        super().__init__(aggregate_cls, model_cls)

    def get_by_id(
        self,
        session: Session,
        id: TAggregateId,
    ) -> Optional[TAggregate]:
        """
        Retrieves an Aggregate by its ID.
        Loads the root model and relies on ORM relationship loading
        (eager or lazy) for related data.
        """
        self._logger.debug(f"Getting aggregate by ID: {id}")
        model = session.get(self.model_cls, id)
        if not model:
            self._logger.warning(
                f"Aggregate ID {id} not found in database for root model {self.model_cls.__name__}."  # noqa: E501
            )
            return None

        self._logger.debug(
            f"Found root model for ID {id}. Mapping to aggregate.",
        )
        return self._map_model_to_aggregate(model)

    def save(self, session: Session, aggregate: TAggregate) -> TAggregate:
        """
        Persists Aggregate state (handles create or update).

        Relies on the provided session being managed externally (Unit of Work).
        Adds new aggregates to the session or updates existing ones.
        Handles optimistic concurrency checking based on the 'version' field
        of the root model.

        For updates involving related models (multiple tables), ensure
        SQLModel relationships and cascade options are correctly configured.
        The mapping logic (`_map_aggregate_to_model`) should update the
        state of the model instances, and the SQLAlchemy session flush
        will handle persisting those changes.
        """
        if not isinstance(aggregate, self.aggregate_cls):
            raise TypeError(
                f"Input must be an instance of {self.aggregate_cls.__name__}, got {type(aggregate).__name__}"  # noqa: E501
            )

        agg_id = aggregate.id
        current_agg_version = aggregate.version
        is_new = current_agg_version == -1

        self._logger.debug(
            f"Attempting to save aggregate ID: {agg_id}, Current Aggregate Version: {current_agg_version}, Is New: {is_new}"  # noqa: E501
        )

        model: Optional[TSQLModel] = None  # Declare model once with its optional type
        try:
            if is_new:
                self._logger.debug(
                    f"Aggregate ID {agg_id} is new. Creating root model."
                )
                model = self.model_cls(id=agg_id, version=-1)
                self._map_aggregate_to_model(aggregate, model)
                # Add to session first.
                session.add(model)
                # If add was successful, update versions.
                aggregate._increment_version()
                model.version = aggregate.version
                self._logger.info(
                    f"Added new aggregate ID: {agg_id} to session with version {aggregate.version}. Commit required externally."  # noqa: E501
                )
            else:
                self._logger.debug(
                    f"Aggregate ID {agg_id} exists. Loading root model for update."  # noqa: E501
                )
                model = session.get(self.model_cls, agg_id)

                if not model:
                    self._logger.error(
                        f"Aggregate ID {agg_id} not found in database during update attempt."  # noqa: E501
                    )
                    raise AggregateNotFoundError(agg_id)

                db_version = getattr(model, "version", None)
                if db_version is None:
                    raise RepositoryError(
                        f"Database model {self.model_cls.__name__} for ID {agg_id} is missing 'version'. Cannot perform optimistic lock."  # noqa: E501
                    )

                if db_version != current_agg_version:
                    self._logger.warning(
                        f"Optimistic lock failed for ID: {agg_id}. Expected DB version {current_agg_version}, found {db_version}."  # noqa: E501
                    )
                    raise OptimisticConcurrencyError(
                        agg_id, current_agg_version, db_version
                    )
                self._logger.debug(
                    f"Optimistic lock check passed for ID {agg_id} (Version: {current_agg_version})."  # noqa: E501
                )

                self._map_aggregate_to_model(aggregate, model)
                aggregate._increment_version()
                model.version = aggregate.version
                self._logger.info(
                    f"Updated aggregate ID: {agg_id} in session to version {aggregate.version}. Commit required externally."  # noqa: E501
                )
            return aggregate

        except (OptimisticConcurrencyError, AggregateNotFoundError) as e:
            self._logger.error(f"Save failed for aggregate ID {agg_id}: {e!s}")
            raise e
        except Exception as e:
            self._logger.exception(
                f"Unexpected error during save for aggregate ID {agg_id}: {e!s}"  # noqa: E501
            )
            raise RepositoryError(
                f"Save failed for aggregate ID {agg_id}: {e!s}"
            ) from e

    def delete_by_id(self, session: Session, id: TAggregateId) -> bool:
        """
        Deletes an Aggregate by its ID.
        Relies on the ORM's cascade delete configuration for related models.
        """
        self._logger.debug(f"Attempting to delete aggregate ID: {id}")
        model = session.get(self.model_cls, id)
        if not model:
            self._logger.warning(f"Aggregate ID: {id} not found for deletion.")
            return False

        try:
            session.delete(model)
            self._logger.info(
                f"Marked aggregate ID: {id} for deletion in session. Commit required externally."  # noqa: E501
            )
            return True
        except Exception as e:
            self._logger.exception(
                f"Delete failed for aggregate ID {id}: {e!s}",
            )
            raise RepositoryError(f"Delete failed for {id}: {e!s}") from e

delete_by_id(session, id)

Deletes an Aggregate by its ID. Relies on the ORM's cascade delete configuration for related models.

Source code in src/castlecraft_engineer/abstractions/repository.py
def delete_by_id(self, session: Session, id: TAggregateId) -> bool:
    """
    Deletes an Aggregate by its ID.
    Relies on the ORM's cascade delete configuration for related models.
    """
    self._logger.debug(f"Attempting to delete aggregate ID: {id}")
    model = session.get(self.model_cls, id)
    if not model:
        self._logger.warning(f"Aggregate ID: {id} not found for deletion.")
        return False

    try:
        session.delete(model)
        self._logger.info(
            f"Marked aggregate ID: {id} for deletion in session. Commit required externally."  # noqa: E501
        )
        return True
    except Exception as e:
        self._logger.exception(
            f"Delete failed for aggregate ID {id}: {e!s}",
        )
        raise RepositoryError(f"Delete failed for {id}: {e!s}") from e

get_by_id(session, id)

Retrieves an Aggregate by its ID. Loads the root model and relies on ORM relationship loading (eager or lazy) for related data.

Source code in src/castlecraft_engineer/abstractions/repository.py
def get_by_id(
    self,
    session: Session,
    id: TAggregateId,
) -> Optional[TAggregate]:
    """
    Retrieves an Aggregate by its ID.
    Loads the root model and relies on ORM relationship loading
    (eager or lazy) for related data.
    """
    self._logger.debug(f"Getting aggregate by ID: {id}")
    model = session.get(self.model_cls, id)
    if not model:
        self._logger.warning(
            f"Aggregate ID {id} not found in database for root model {self.model_cls.__name__}."  # noqa: E501
        )
        return None

    self._logger.debug(
        f"Found root model for ID {id}. Mapping to aggregate.",
    )
    return self._map_model_to_aggregate(model)

save(session, aggregate)

Persists Aggregate state (handles create or update).

Relies on the provided session being managed externally (Unit of Work). Adds new aggregates to the session or updates existing ones. Handles optimistic concurrency checking based on the 'version' field of the root model.

For updates involving related models (multiple tables), ensure SQLModel relationships and cascade options are correctly configured. The mapping logic (_map_aggregate_to_model) should update the state of the model instances, and the SQLAlchemy session flush will handle persisting those changes.

Source code in src/castlecraft_engineer/abstractions/repository.py
def save(self, session: Session, aggregate: TAggregate) -> TAggregate:
    """
    Persists Aggregate state (handles create or update).

    Relies on the provided session being managed externally (Unit of Work).
    Adds new aggregates to the session or updates existing ones.
    Handles optimistic concurrency checking based on the 'version' field
    of the root model.

    For updates involving related models (multiple tables), ensure
    SQLModel relationships and cascade options are correctly configured.
    The mapping logic (`_map_aggregate_to_model`) should update the
    state of the model instances, and the SQLAlchemy session flush
    will handle persisting those changes.
    """
    if not isinstance(aggregate, self.aggregate_cls):
        raise TypeError(
            f"Input must be an instance of {self.aggregate_cls.__name__}, got {type(aggregate).__name__}"  # noqa: E501
        )

    agg_id = aggregate.id
    current_agg_version = aggregate.version
    is_new = current_agg_version == -1

    self._logger.debug(
        f"Attempting to save aggregate ID: {agg_id}, Current Aggregate Version: {current_agg_version}, Is New: {is_new}"  # noqa: E501
    )

    model: Optional[TSQLModel] = None  # Declare model once with its optional type
    try:
        if is_new:
            self._logger.debug(
                f"Aggregate ID {agg_id} is new. Creating root model."
            )
            model = self.model_cls(id=agg_id, version=-1)
            self._map_aggregate_to_model(aggregate, model)
            # Add to session first.
            session.add(model)
            # If add was successful, update versions.
            aggregate._increment_version()
            model.version = aggregate.version
            self._logger.info(
                f"Added new aggregate ID: {agg_id} to session with version {aggregate.version}. Commit required externally."  # noqa: E501
            )
        else:
            self._logger.debug(
                f"Aggregate ID {agg_id} exists. Loading root model for update."  # noqa: E501
            )
            model = session.get(self.model_cls, agg_id)

            if not model:
                self._logger.error(
                    f"Aggregate ID {agg_id} not found in database during update attempt."  # noqa: E501
                )
                raise AggregateNotFoundError(agg_id)

            db_version = getattr(model, "version", None)
            if db_version is None:
                raise RepositoryError(
                    f"Database model {self.model_cls.__name__} for ID {agg_id} is missing 'version'. Cannot perform optimistic lock."  # noqa: E501
                )

            if db_version != current_agg_version:
                self._logger.warning(
                    f"Optimistic lock failed for ID: {agg_id}. Expected DB version {current_agg_version}, found {db_version}."  # noqa: E501
                )
                raise OptimisticConcurrencyError(
                    agg_id, current_agg_version, db_version
                )
            self._logger.debug(
                f"Optimistic lock check passed for ID {agg_id} (Version: {current_agg_version})."  # noqa: E501
            )

            self._map_aggregate_to_model(aggregate, model)
            aggregate._increment_version()
            model.version = aggregate.version
            self._logger.info(
                f"Updated aggregate ID: {agg_id} in session to version {aggregate.version}. Commit required externally."  # noqa: E501
            )
        return aggregate

    except (OptimisticConcurrencyError, AggregateNotFoundError) as e:
        self._logger.error(f"Save failed for aggregate ID {agg_id}: {e!s}")
        raise e
    except Exception as e:
        self._logger.exception(
            f"Unexpected error during save for aggregate ID {agg_id}: {e!s}"  # noqa: E501
        )
        raise RepositoryError(
            f"Save failed for aggregate ID {agg_id}: {e!s}"
        ) from e

AsyncAggregateRepository

Bases: _RepositoryBase[TAggregateId, TAggregate, TSQLModel]

Generic asynchronous repository mapping Aggregates to SQLModels using AsyncSession.

Source code in src/castlecraft_engineer/abstractions/repository.py
class AsyncAggregateRepository(
    _RepositoryBase[
        TAggregateId,
        TAggregate,
        TSQLModel,
    ],
):
    """
    Generic asynchronous repository mapping
    Aggregates to SQLModels using AsyncSession.
    """

    def __init__(
        self,
        aggregate_cls: Type[TAggregate],
        model_cls: Type[TSQLModel],
    ):
        super().__init__(aggregate_cls, model_cls)

    # _map_model_to_aggregate and _map_aggregate_to_model are inherited

    async def get_by_id(
        self,
        session: AsyncSession,
        id: TAggregateId,
    ) -> Optional[TAggregate]:
        """
        Asynchronously retrieves an Aggregate by its ID using AsyncSession.
        """
        self._logger.debug(f"Getting aggregate by ID: {id}")
        model: Optional[TSQLModel] = await session.get(self.model_cls, id)
        if not model:
            self._logger.warning(
                f"Aggregate ID {id} not found in database for root model {self.model_cls.__name__}."  # noqa: 501
            )
            return None

        self._logger.debug(
            f"Found root model for ID {id}. Mapping to aggregate.",
        )
        return self._map_model_to_aggregate(model)

    async def save(
        self,
        session: AsyncSession,
        aggregate: TAggregate,
    ) -> TAggregate:
        """
        Asynchronously persists Aggregate state using AsyncSession.
        """
        if not isinstance(aggregate, self.aggregate_cls):
            raise TypeError(
                f"Input must be an instance of {self.aggregate_cls.__name__}, got {type(aggregate).__name__}"  # noqa: 501
            )

        agg_id = aggregate.id
        current_agg_version = aggregate.version
        is_new = current_agg_version == -1

        self._logger.debug(
            f"Attempting to save aggregate ID: {agg_id}, Current Aggregate Version: {current_agg_version}, Is New: {is_new}"  # noqa: 501
        )

        model: Optional[TSQLModel] = None  # Declare model once with its optional type
        try:
            if is_new:
                self._logger.debug(
                    f"Aggregate ID {agg_id} is new. Creating root model."
                )
                model = self.model_cls(id=agg_id, version=-1)
                self._map_aggregate_to_model(aggregate, model)
                # Add to session first.
                session.add(model)
                # If add was successful, update versions.
                aggregate._increment_version()
                model.version = aggregate.version
                self._logger.info(
                    f"Added new aggregate ID: {agg_id} to session with version {aggregate.version}. Commit required externally."  # noqa: 501
                )
            else:
                self._logger.debug(
                    f"Aggregate ID {agg_id} exists. Loading root model for update."  # noqa: E501
                )
                model = await session.get(self.model_cls, agg_id)

                if not model:
                    self._logger.error(
                        f"Aggregate ID {agg_id} not found in database during update attempt."  # noqa: 501
                    )
                    raise AggregateNotFoundError(agg_id)

                db_version = getattr(model, "version", None)
                if db_version is None:
                    raise RepositoryError(
                        f"Database model {self.model_cls.__name__} for ID {agg_id} is missing 'version'. Cannot perform optimistic lock."  # noqa: 501
                    )

                if db_version != current_agg_version:
                    self._logger.warning(
                        f"Optimistic lock failed for ID: {agg_id}. Expected DB version {current_agg_version}, found {db_version}."  # noqa: 501
                    )
                    raise OptimisticConcurrencyError(
                        agg_id, current_agg_version, db_version
                    )
                self._logger.debug(
                    f"Optimistic lock check passed for ID {agg_id} (Version: {current_agg_version})."  # noqa: 501
                )

                self._map_aggregate_to_model(aggregate, model)
                aggregate._increment_version()
                model.version = aggregate.version

                self._logger.info(
                    f"Updated aggregate ID: {agg_id} in session to version {aggregate.version}. Commit required externally."  # noqa: 501
                )
            return aggregate

        except (OptimisticConcurrencyError, AggregateNotFoundError) as e:
            self._logger.error(f"Save failed for aggregate ID {agg_id}: {e!s}")
            raise e
        except Exception as e:
            self._logger.exception(
                f"Unexpected error during save for aggregate ID {agg_id}: {e!s}"  # noqa: 501
            )
            raise RepositoryError(
                f"Save failed for aggregate ID {agg_id}: {e!s}"
            ) from e

    async def delete_by_id(
        self,
        session: AsyncSession,
        id: TAggregateId,
    ) -> bool:
        """
        Asynchronously deletes an Aggregate by its ID using AsyncSession.
        """
        self._logger.debug(f"Attempting to delete aggregate ID: {id}")
        model: Optional[TSQLModel] = await session.get(self.model_cls, id)
        if not model:
            self._logger.warning(f"Aggregate ID: {id} not found for deletion.")
            return False

        try:
            # For async, session.delete is synchronous, flush/commit is async
            # However, SQLModel/SQLAlchemy handles this correctly.
            # The actual delete operation is usually part of the flush.
            await session.delete(model)  # This marks the object for deletion.
            self._logger.info(
                f"Marked aggregate ID: {id} for deletion in session. Commit/flush required externally."  # noqa: 501
            )
            return True
        except Exception as e:
            self._logger.exception(
                f"Delete failed for aggregate ID {id}: {e!s}",
            )
            raise RepositoryError(f"Delete failed for {id}: {e!s}") from e

delete_by_id(session, id) async

Asynchronously deletes an Aggregate by its ID using AsyncSession.

Source code in src/castlecraft_engineer/abstractions/repository.py
async def delete_by_id(
    self,
    session: AsyncSession,
    id: TAggregateId,
) -> bool:
    """
    Asynchronously deletes an Aggregate by its ID using AsyncSession.
    """
    self._logger.debug(f"Attempting to delete aggregate ID: {id}")
    model: Optional[TSQLModel] = await session.get(self.model_cls, id)
    if not model:
        self._logger.warning(f"Aggregate ID: {id} not found for deletion.")
        return False

    try:
        # For async, session.delete is synchronous, flush/commit is async
        # However, SQLModel/SQLAlchemy handles this correctly.
        # The actual delete operation is usually part of the flush.
        await session.delete(model)  # This marks the object for deletion.
        self._logger.info(
            f"Marked aggregate ID: {id} for deletion in session. Commit/flush required externally."  # noqa: 501
        )
        return True
    except Exception as e:
        self._logger.exception(
            f"Delete failed for aggregate ID {id}: {e!s}",
        )
        raise RepositoryError(f"Delete failed for {id}: {e!s}") from e

get_by_id(session, id) async

Asynchronously retrieves an Aggregate by its ID using AsyncSession.

Source code in src/castlecraft_engineer/abstractions/repository.py
async def get_by_id(
    self,
    session: AsyncSession,
    id: TAggregateId,
) -> Optional[TAggregate]:
    """
    Asynchronously retrieves an Aggregate by its ID using AsyncSession.
    """
    self._logger.debug(f"Getting aggregate by ID: {id}")
    model: Optional[TSQLModel] = await session.get(self.model_cls, id)
    if not model:
        self._logger.warning(
            f"Aggregate ID {id} not found in database for root model {self.model_cls.__name__}."  # noqa: 501
        )
        return None

    self._logger.debug(
        f"Found root model for ID {id}. Mapping to aggregate.",
    )
    return self._map_model_to_aggregate(model)

save(session, aggregate) async

Asynchronously persists Aggregate state using AsyncSession.

Source code in src/castlecraft_engineer/abstractions/repository.py
async def save(
    self,
    session: AsyncSession,
    aggregate: TAggregate,
) -> TAggregate:
    """
    Asynchronously persists Aggregate state using AsyncSession.
    """
    if not isinstance(aggregate, self.aggregate_cls):
        raise TypeError(
            f"Input must be an instance of {self.aggregate_cls.__name__}, got {type(aggregate).__name__}"  # noqa: 501
        )

    agg_id = aggregate.id
    current_agg_version = aggregate.version
    is_new = current_agg_version == -1

    self._logger.debug(
        f"Attempting to save aggregate ID: {agg_id}, Current Aggregate Version: {current_agg_version}, Is New: {is_new}"  # noqa: 501
    )

    model: Optional[TSQLModel] = None  # Declare model once with its optional type
    try:
        if is_new:
            self._logger.debug(
                f"Aggregate ID {agg_id} is new. Creating root model."
            )
            model = self.model_cls(id=agg_id, version=-1)
            self._map_aggregate_to_model(aggregate, model)
            # Add to session first.
            session.add(model)
            # If add was successful, update versions.
            aggregate._increment_version()
            model.version = aggregate.version
            self._logger.info(
                f"Added new aggregate ID: {agg_id} to session with version {aggregate.version}. Commit required externally."  # noqa: 501
            )
        else:
            self._logger.debug(
                f"Aggregate ID {agg_id} exists. Loading root model for update."  # noqa: E501
            )
            model = await session.get(self.model_cls, agg_id)

            if not model:
                self._logger.error(
                    f"Aggregate ID {agg_id} not found in database during update attempt."  # noqa: 501
                )
                raise AggregateNotFoundError(agg_id)

            db_version = getattr(model, "version", None)
            if db_version is None:
                raise RepositoryError(
                    f"Database model {self.model_cls.__name__} for ID {agg_id} is missing 'version'. Cannot perform optimistic lock."  # noqa: 501
                )

            if db_version != current_agg_version:
                self._logger.warning(
                    f"Optimistic lock failed for ID: {agg_id}. Expected DB version {current_agg_version}, found {db_version}."  # noqa: 501
                )
                raise OptimisticConcurrencyError(
                    agg_id, current_agg_version, db_version
                )
            self._logger.debug(
                f"Optimistic lock check passed for ID {agg_id} (Version: {current_agg_version})."  # noqa: 501
            )

            self._map_aggregate_to_model(aggregate, model)
            aggregate._increment_version()
            model.version = aggregate.version

            self._logger.info(
                f"Updated aggregate ID: {agg_id} in session to version {aggregate.version}. Commit required externally."  # noqa: 501
            )
        return aggregate

    except (OptimisticConcurrencyError, AggregateNotFoundError) as e:
        self._logger.error(f"Save failed for aggregate ID {agg_id}: {e!s}")
        raise e
    except Exception as e:
        self._logger.exception(
            f"Unexpected error during save for aggregate ID {agg_id}: {e!s}"  # noqa: 501
        )
        raise RepositoryError(
            f"Save failed for aggregate ID {agg_id}: {e!s}"
        ) from e

Command dataclass

Bases: ABC

Abstract base class for all commands in the CQRS pattern. Commands represent an intention to change the system state. They should be immutable data structures.

Source code in src/castlecraft_engineer/abstractions/command.py
@dataclass(frozen=True)
class Command(abc.ABC):
    """
    Abstract base class for all commands in the CQRS pattern.
    Commands represent an intention to change the system state.
    They should be immutable data structures.
    """

CommandBus

Coordinates the execution of commands by routing them to registered handlers, using a globally accessible dependency injection container.

Source code in src/castlecraft_engineer/abstractions/command_bus.py
class CommandBus:
    """
    Coordinates the execution of commands by
    routing them to registered handlers,
    using a globally accessible
    dependency injection container.
    """

    def __init__(self, container: punq.Container) -> None:
        """
        Initializes the CommandBus with an
        empty handler registry.
        """

        self._container = container
        self._handler_classes: Dict[
            Type[Command],
            Type[CommandHandler[Any]],
        ] = {}

    def _get_command_type(
        self,
        handler_cls: Type[CommandHandler[TCommand]],
    ) -> Type[TCommand]:
        """
        Inspects a handler class to find the
        Command type it handles.

        Raises:
            TypeError: If the command type cannot be determined.
        """
        for base in getattr(handler_cls, "__orig_bases__", []):
            origin = typing.get_origin(base)
            if origin is CommandHandler:
                args = typing.get_args(base)
                if (
                    args
                    and isinstance(args[0], type)
                    and issubclass(args[0], Command)  # noqa: E501
                ):
                    return typing.cast(Type[TCommand], args[0])

        raise TypeError(
            "Could not determine Command type for "
            f"handler {handler_cls.__name__}. "
            "Ensure it inherits directly like: "
            "MyHandler(CommandHandler[MySpecificCommand])."
        )

    def register(
        self, handler_cls: Type[CommandHandler[TCommand]]
    ) -> Type[CommandHandler[TCommand]]:
        """
        Decorator to register a CommandHandler class with
        the bus and the global DI container.

        Args:
            handler_cls: The CommandHandler class to register.

        Returns:
            The original handler class, unchanged.

        Raises:
            TypeError: If the handler_cls is not a valid
                       CommandHandler subclass or its command
                       type cannot be determined.
        """
        is_class = inspect.isclass(handler_cls)
        if not is_class:
            raise TypeError(
                f"{repr(handler_cls)} is not a valid CommandHandler.",
            )

        if not issubclass(handler_cls, CommandHandler):
            raise TypeError(
                f"{handler_cls.__name__} is not a valid CommandHandler.",
            )

        command_type = self._get_command_type(handler_cls)

        if command_type in self._handler_classes:
            raise ValueError(
                f"Handler already registered for command {command_type.__name__}"  # noqa: E501
            )

        self._handler_classes[command_type] = handler_cls

        return handler_cls

    async def execute(
        self,
        command: Command,
        subject_id: Optional[str] = None,
        permissions: List[Permission] = [],
        *args,
        **kwargs,
    ) -> Any:
        """
        Executes a command by finding its handler class,
        resolving it via the
        global DI container, authorizing, and handling.

        Args:
            command: The command instance to execute.
            subject_id: The ID of the subject attempting
                        to execute the command.
            permissions: The permissions associated
                        with the subject.

        Raises:
            CommandHandlerNotFoundError: If no handler class is
                                        registered for the command type.
            AuthorizationError: If the handler denies authorization.
            punq.MissingDependencyError: If the container cannot
                                        resolve the handler or
                                        its dependencies.
            Exception: Any other exception raised
                        during handler resolution
                        or execution.
        """
        command_type = type(command)
        handler_cls = self._handler_classes.get(command_type)

        if handler_cls is None:
            raise CommandHandlerNotFoundError(command_type)

        try:
            # Use the container directly
            handler = self._container.resolve(handler_cls)
        except MissingDependencyError as e:
            raise MissingDependencyError(
                "Failed to resolve handler "
                f"{handler_cls.__name__} for command "
                f"{command_type.__name__}: {e}"
            ) from e
        except Exception as e:
            raise RuntimeError(
                f"Unexpected error resolving handler for {handler_cls.__name__}: {e}"  # noqa: E501
            ) from e

        if not isinstance(handler, CommandHandler):
            raise TypeError(
                f"Resolved object for {handler_cls.__name__} is not a CommandHandler instance."  # noqa: E501
            )

        return await handler.execute(
            command,
            subject_id=subject_id,
            permissions=permissions,
            *args,
            **kwargs,
        )

__init__(container)

Initializes the CommandBus with an empty handler registry.

Source code in src/castlecraft_engineer/abstractions/command_bus.py
def __init__(self, container: punq.Container) -> None:
    """
    Initializes the CommandBus with an
    empty handler registry.
    """

    self._container = container
    self._handler_classes: Dict[
        Type[Command],
        Type[CommandHandler[Any]],
    ] = {}

execute(command, subject_id=None, permissions=[], *args, **kwargs) async

Executes a command by finding its handler class, resolving it via the global DI container, authorizing, and handling.

Parameters:

Name Type Description Default
command Command

The command instance to execute.

required
subject_id Optional[str]

The ID of the subject attempting to execute the command.

None
permissions List[Permission]

The permissions associated with the subject.

[]

Raises:

Type Description
CommandHandlerNotFoundError

If no handler class is registered for the command type.

AuthorizationError

If the handler denies authorization.

MissingDependencyError

If the container cannot resolve the handler or its dependencies.

Exception

Any other exception raised during handler resolution or execution.

Source code in src/castlecraft_engineer/abstractions/command_bus.py
async def execute(
    self,
    command: Command,
    subject_id: Optional[str] = None,
    permissions: List[Permission] = [],
    *args,
    **kwargs,
) -> Any:
    """
    Executes a command by finding its handler class,
    resolving it via the
    global DI container, authorizing, and handling.

    Args:
        command: The command instance to execute.
        subject_id: The ID of the subject attempting
                    to execute the command.
        permissions: The permissions associated
                    with the subject.

    Raises:
        CommandHandlerNotFoundError: If no handler class is
                                    registered for the command type.
        AuthorizationError: If the handler denies authorization.
        punq.MissingDependencyError: If the container cannot
                                    resolve the handler or
                                    its dependencies.
        Exception: Any other exception raised
                    during handler resolution
                    or execution.
    """
    command_type = type(command)
    handler_cls = self._handler_classes.get(command_type)

    if handler_cls is None:
        raise CommandHandlerNotFoundError(command_type)

    try:
        # Use the container directly
        handler = self._container.resolve(handler_cls)
    except MissingDependencyError as e:
        raise MissingDependencyError(
            "Failed to resolve handler "
            f"{handler_cls.__name__} for command "
            f"{command_type.__name__}: {e}"
        ) from e
    except Exception as e:
        raise RuntimeError(
            f"Unexpected error resolving handler for {handler_cls.__name__}: {e}"  # noqa: E501
        ) from e

    if not isinstance(handler, CommandHandler):
        raise TypeError(
            f"Resolved object for {handler_cls.__name__} is not a CommandHandler instance."  # noqa: E501
        )

    return await handler.execute(
        command,
        subject_id=subject_id,
        permissions=permissions,
        *args,
        **kwargs,
    )

register(handler_cls)

Decorator to register a CommandHandler class with the bus and the global DI container.

Parameters:

Name Type Description Default
handler_cls Type[CommandHandler[TCommand]]

The CommandHandler class to register.

required

Returns:

Type Description
Type[CommandHandler[TCommand]]

The original handler class, unchanged.

Raises:

Type Description
TypeError

If the handler_cls is not a valid CommandHandler subclass or its command type cannot be determined.

Source code in src/castlecraft_engineer/abstractions/command_bus.py
def register(
    self, handler_cls: Type[CommandHandler[TCommand]]
) -> Type[CommandHandler[TCommand]]:
    """
    Decorator to register a CommandHandler class with
    the bus and the global DI container.

    Args:
        handler_cls: The CommandHandler class to register.

    Returns:
        The original handler class, unchanged.

    Raises:
        TypeError: If the handler_cls is not a valid
                   CommandHandler subclass or its command
                   type cannot be determined.
    """
    is_class = inspect.isclass(handler_cls)
    if not is_class:
        raise TypeError(
            f"{repr(handler_cls)} is not a valid CommandHandler.",
        )

    if not issubclass(handler_cls, CommandHandler):
        raise TypeError(
            f"{handler_cls.__name__} is not a valid CommandHandler.",
        )

    command_type = self._get_command_type(handler_cls)

    if command_type in self._handler_classes:
        raise ValueError(
            f"Handler already registered for command {command_type.__name__}"  # noqa: E501
        )

    self._handler_classes[command_type] = handler_cls

    return handler_cls

CommandHandler

Bases: Generic[TCommand], ABC

Abstract base class for command handlers. Each handler is responsible for processing a specific type of command.

Source code in src/castlecraft_engineer/abstractions/command_handler.py
class CommandHandler(Generic[TCommand], abc.ABC):
    """
    Abstract base class for command handlers.
    Each handler is responsible for processing
    a specific type of command.
    """

    async def authorize(
        self,
        subject_id: Optional[str] = None,
        permissions: List[Permission] = [],
        *args: Any,
        **kwargs: Any,
    ) -> Optional[bool]:
        """
        Optional pre-execution authorization check for the command.

        NOTE: This method is NOT automatically called by the default CommandBus.
        It serves as a convention or hook for developers to implement custom
        pre-authorization logic if they choose to call it explicitly from within
        their `execute` method, or if they are using a custom bus implementation
        that invokes it.

        The recommended pattern for authorization with the default bus is to use
        an injected `AuthorizationService` within the `execute` method.

        Args:
            subject_id: The ID of the subject.
            permissions: A list of Permission objects representing the
                         permissions granted to the subject.
            *args: Additional positional arguments.
            **kwargs: Additional keyword arguments for context.
        Returns:
            True if the subject has permission, False otherwise.
        """
        pass  # pragma: no cover

    @abc.abstractmethod
    async def execute(
        self,
        command: TCommand,
        *args,
        subject_id: Optional[str] = None,
        permissions: List[Permission] = [],
        **kwargs,
    ) -> Any:
        """
        Handles the execution logic for the given command.

        Args:
            command: The command instance to be processed.
            subject_id: The ID of the subject attempting
                        to execute the command. Optional.
            permissions: The permissions associated with the subject.

        Raises:
            NotImplementedError: This method must be implemented by concrete subclasses.
        """
        raise NotImplementedError

authorize(subject_id=None, permissions=[], *args, **kwargs) async

Optional pre-execution authorization check for the command.

NOTE: This method is NOT automatically called by the default CommandBus. It serves as a convention or hook for developers to implement custom pre-authorization logic if they choose to call it explicitly from within their execute method, or if they are using a custom bus implementation that invokes it.

The recommended pattern for authorization with the default bus is to use an injected AuthorizationService within the execute method.

Parameters:

Name Type Description Default
subject_id Optional[str]

The ID of the subject.

None
permissions List[Permission]

A list of Permission objects representing the permissions granted to the subject.

[]
*args Any

Additional positional arguments.

()
**kwargs Any

Additional keyword arguments for context.

{}

Returns: True if the subject has permission, False otherwise.

Source code in src/castlecraft_engineer/abstractions/command_handler.py
async def authorize(
    self,
    subject_id: Optional[str] = None,
    permissions: List[Permission] = [],
    *args: Any,
    **kwargs: Any,
) -> Optional[bool]:
    """
    Optional pre-execution authorization check for the command.

    NOTE: This method is NOT automatically called by the default CommandBus.
    It serves as a convention or hook for developers to implement custom
    pre-authorization logic if they choose to call it explicitly from within
    their `execute` method, or if they are using a custom bus implementation
    that invokes it.

    The recommended pattern for authorization with the default bus is to use
    an injected `AuthorizationService` within the `execute` method.

    Args:
        subject_id: The ID of the subject.
        permissions: A list of Permission objects representing the
                     permissions granted to the subject.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments for context.
    Returns:
        True if the subject has permission, False otherwise.
    """
    pass  # pragma: no cover

execute(command, *args, subject_id=None, permissions=[], **kwargs) abstractmethod async

Handles the execution logic for the given command.

Parameters:

Name Type Description Default
command TCommand

The command instance to be processed.

required
subject_id Optional[str]

The ID of the subject attempting to execute the command. Optional.

None
permissions List[Permission]

The permissions associated with the subject.

[]

Raises:

Type Description
NotImplementedError

This method must be implemented by concrete subclasses.

Source code in src/castlecraft_engineer/abstractions/command_handler.py
@abc.abstractmethod
async def execute(
    self,
    command: TCommand,
    *args,
    subject_id: Optional[str] = None,
    permissions: List[Permission] = [],
    **kwargs,
) -> Any:
    """
    Handles the execution logic for the given command.

    Args:
        command: The command instance to be processed.
        subject_id: The ID of the subject attempting
                    to execute the command. Optional.
        permissions: The permissions associated with the subject.

    Raises:
        NotImplementedError: This method must be implemented by concrete subclasses.
    """
    raise NotImplementedError

EventHandler

Bases: Generic[TEvent], ABC

Abstract base class for event handlers. Each handler is responsible for reacting to a specific type of domain event.

Source code in src/castlecraft_engineer/abstractions/event_handler.py
class EventHandler(Generic[TEvent], abc.ABC):
    """
    Abstract base class for event handlers.
    Each handler is responsible for reacting to
    a specific type of domain event.
    """

    @abc.abstractmethod
    async def handle(self, event: TEvent) -> None:
        """
        Handles the logic to execute when the specific event occurs.
        Marked as async to easily accommodate I/O operations
        (like sending emails, updating databases, calling APIs).

        Args:
            event: The event instance to be processed.

        Raises:
            NotImplementedError: This method must be implemented by
                                concrete subclasses.
        """
        raise NotImplementedError

handle(event) abstractmethod async

Handles the logic to execute when the specific event occurs. Marked as async to easily accommodate I/O operations (like sending emails, updating databases, calling APIs).

Parameters:

Name Type Description Default
event TEvent

The event instance to be processed.

required

Raises:

Type Description
NotImplementedError

This method must be implemented by concrete subclasses.

Source code in src/castlecraft_engineer/abstractions/event_handler.py
@abc.abstractmethod
async def handle(self, event: TEvent) -> None:
    """
    Handles the logic to execute when the specific event occurs.
    Marked as async to easily accommodate I/O operations
    (like sending emails, updating databases, calling APIs).

    Args:
        event: The event instance to be processed.

    Raises:
        NotImplementedError: This method must be implemented by
                            concrete subclasses.
    """
    raise NotImplementedError

EventStore

Bases: Generic[TAggregateId], ABC

Abstract base class for an event store, responsible for persisting and retrieving streams of domain events.

Source code in src/castlecraft_engineer/abstractions/event_store.py
class EventStore(Generic[TAggregateId], abc.ABC):
    """
    Abstract base class for an event store, responsible for persisting
    and retrieving streams of domain events.
    """

    @abc.abstractmethod
    async def append_events(
        self,
        aggregate_id: TAggregateId,
        expected_version: int,
        events: List[Event],
    ) -> None:
        """
        Appends a list of events to the stream for a given aggregate.

        Args:
            aggregate_id: The ID of the aggregate to which the events belong.
            expected_version: The version of the aggregate that these events are based on.
                              Used for optimistic concurrency control. If the current
                              version in the store does not match this, an
                              EventStoreConflictError should be raised.
            events: A list of domain event instances to append.

        Raises:
            EventStoreConflictError: If the expected_version does not match the
                                     current version of the event stream for the aggregate.
            Exception: Implementation-specific exceptions related to storage failures.
        """
        raise NotImplementedError

    @abc.abstractmethod
    async def load_events(
        self,
        aggregate_id: TAggregateId,
        from_version: Optional[int] = None,
    ) -> List[Event]:
        """
        Loads the stream of events for a given aggregate.

        Args:
            aggregate_id: The ID of the aggregate whose events are to be loaded.
            from_version: Optionally, the version from which to start loading events.
                          If None, loads all events for the aggregate.

        Returns:
            A list of domain event instances, ordered by their sequence.
            Returns an empty list if the aggregate has no events or doesn't exist.
        """
        raise NotImplementedError

    @abc.abstractmethod
    async def get_current_version(self, aggregate_id: TAggregateId) -> Optional[int]:
        """
        Retrieves the current version of the event stream for a given aggregate.

        Args:
            aggregate_id: The ID of the aggregate.

        Returns:
            The current version (number of events - 1, or sequence of last event)
            or None if the aggregate stream doesn't exist.
        """
        raise NotImplementedError

append_events(aggregate_id, expected_version, events) abstractmethod async

Appends a list of events to the stream for a given aggregate.

Parameters:

Name Type Description Default
aggregate_id TAggregateId

The ID of the aggregate to which the events belong.

required
expected_version int

The version of the aggregate that these events are based on. Used for optimistic concurrency control. If the current version in the store does not match this, an EventStoreConflictError should be raised.

required
events List[Event]

A list of domain event instances to append.

required

Raises:

Type Description
EventStoreConflictError

If the expected_version does not match the current version of the event stream for the aggregate.

Exception

Implementation-specific exceptions related to storage failures.

Source code in src/castlecraft_engineer/abstractions/event_store.py
@abc.abstractmethod
async def append_events(
    self,
    aggregate_id: TAggregateId,
    expected_version: int,
    events: List[Event],
) -> None:
    """
    Appends a list of events to the stream for a given aggregate.

    Args:
        aggregate_id: The ID of the aggregate to which the events belong.
        expected_version: The version of the aggregate that these events are based on.
                          Used for optimistic concurrency control. If the current
                          version in the store does not match this, an
                          EventStoreConflictError should be raised.
        events: A list of domain event instances to append.

    Raises:
        EventStoreConflictError: If the expected_version does not match the
                                 current version of the event stream for the aggregate.
        Exception: Implementation-specific exceptions related to storage failures.
    """
    raise NotImplementedError

get_current_version(aggregate_id) abstractmethod async

Retrieves the current version of the event stream for a given aggregate.

Parameters:

Name Type Description Default
aggregate_id TAggregateId

The ID of the aggregate.

required

Returns:

Type Description
Optional[int]

The current version (number of events - 1, or sequence of last event)

Optional[int]

or None if the aggregate stream doesn't exist.

Source code in src/castlecraft_engineer/abstractions/event_store.py
@abc.abstractmethod
async def get_current_version(self, aggregate_id: TAggregateId) -> Optional[int]:
    """
    Retrieves the current version of the event stream for a given aggregate.

    Args:
        aggregate_id: The ID of the aggregate.

    Returns:
        The current version (number of events - 1, or sequence of last event)
        or None if the aggregate stream doesn't exist.
    """
    raise NotImplementedError

load_events(aggregate_id, from_version=None) abstractmethod async

Loads the stream of events for a given aggregate.

Parameters:

Name Type Description Default
aggregate_id TAggregateId

The ID of the aggregate whose events are to be loaded.

required
from_version Optional[int]

Optionally, the version from which to start loading events. If None, loads all events for the aggregate.

None

Returns:

Type Description
List[Event]

A list of domain event instances, ordered by their sequence.

List[Event]

Returns an empty list if the aggregate has no events or doesn't exist.

Source code in src/castlecraft_engineer/abstractions/event_store.py
@abc.abstractmethod
async def load_events(
    self,
    aggregate_id: TAggregateId,
    from_version: Optional[int] = None,
) -> List[Event]:
    """
    Loads the stream of events for a given aggregate.

    Args:
        aggregate_id: The ID of the aggregate whose events are to be loaded.
        from_version: Optionally, the version from which to start loading events.
                      If None, loads all events for the aggregate.

    Returns:
        A list of domain event instances, ordered by their sequence.
        Returns an empty list if the aggregate has no events or doesn't exist.
    """
    raise NotImplementedError

EventStreamConsumer

Bases: ABC

Abstract base class for consuming events from an external stream and publishing them to an internal EventBus.

Source code in src/castlecraft_engineer/abstractions/event_consumer.py
class EventStreamConsumer(ABC):
    """
    Abstract base class for consuming events from an external stream
    and publishing them to an internal EventBus.
    """

    @abstractmethod
    async def run(self):
        """
        Starts the consumer loop to listen for and process events.
        This method should typically run indefinitely
        until shutdown is requested.
        """
        raise NotImplementedError

    @abstractmethod
    async def shutdown(self):
        """
        Initiates a graceful shutdown of the consumer.
        Implementations should ensure pending work is
        handled and resources are released.
        """
        raise NotImplementedError

run() abstractmethod async

Starts the consumer loop to listen for and process events. This method should typically run indefinitely until shutdown is requested.

Source code in src/castlecraft_engineer/abstractions/event_consumer.py
@abstractmethod
async def run(self):
    """
    Starts the consumer loop to listen for and process events.
    This method should typically run indefinitely
    until shutdown is requested.
    """
    raise NotImplementedError

shutdown() abstractmethod async

Initiates a graceful shutdown of the consumer. Implementations should ensure pending work is handled and resources are released.

Source code in src/castlecraft_engineer/abstractions/event_consumer.py
@abstractmethod
async def shutdown(self):
    """
    Initiates a graceful shutdown of the consumer.
    Implementations should ensure pending work is
    handled and resources are released.
    """
    raise NotImplementedError

ExternalEventPublisher

Bases: ABC

Abstract base class for publishing domain events to external messaging systems (e.g., Kafka, RabbitMQ, Redis Streams).

Source code in src/castlecraft_engineer/abstractions/event_publisher.py
class ExternalEventPublisher(abc.ABC):
    """
    Abstract base class for publishing domain events to external
    messaging systems (e.g., Kafka, RabbitMQ, Redis Streams).
    """

    @abc.abstractmethod
    async def publish(self, events: List[Event]) -> None:
        """
        Publishes a list of domain events to the external broker.

        Implementations should handle serialization,connection management,
        and the specifics of the chosen message broker protocol.

        Args:
            events: A list of domain event instances to publish.

        Raises:
            NotImplementedError: This method must be implemented by
                                 concrete subclasses.
            Exception: Implementation-specific exceptions related to
                       publishing failures (e.g., connection errors,
                       serialization issues).
        """
        raise NotImplementedError

    @abc.abstractmethod
    async def close(self) -> None:
        """Clean up resources, like network connections."""
        raise NotImplementedError

close() abstractmethod async

Clean up resources, like network connections.

Source code in src/castlecraft_engineer/abstractions/event_publisher.py
@abc.abstractmethod
async def close(self) -> None:
    """Clean up resources, like network connections."""
    raise NotImplementedError

publish(events) abstractmethod async

Publishes a list of domain events to the external broker.

Implementations should handle serialization,connection management, and the specifics of the chosen message broker protocol.

Parameters:

Name Type Description Default
events List[Event]

A list of domain event instances to publish.

required

Raises:

Type Description
NotImplementedError

This method must be implemented by concrete subclasses.

Exception

Implementation-specific exceptions related to publishing failures (e.g., connection errors, serialization issues).

Source code in src/castlecraft_engineer/abstractions/event_publisher.py
@abc.abstractmethod
async def publish(self, events: List[Event]) -> None:
    """
    Publishes a list of domain events to the external broker.

    Implementations should handle serialization,connection management,
    and the specifics of the chosen message broker protocol.

    Args:
        events: A list of domain event instances to publish.

    Raises:
        NotImplementedError: This method must be implemented by
                             concrete subclasses.
        Exception: Implementation-specific exceptions related to
                   publishing failures (e.g., connection errors,
                   serialization issues).
    """
    raise NotImplementedError

Query dataclass

Bases: ABC

Abstract base class for all Queries in the CQRS pattern. Query represent an intention to request the system state. They should be immutable data structures.

Source code in src/castlecraft_engineer/abstractions/query.py
@dataclass(frozen=True)
class Query(abc.ABC):
    """
    Abstract base class for all Queries in the CQRS pattern.
    Query represent an intention to request the system state.
    They should be immutable data structures.
    """

QueryBus

Coordinates the execution of query by routing them to registered handlers, using a globally accessible dependency injection container.

Source code in src/castlecraft_engineer/abstractions/query_bus.py
class QueryBus:
    """
    Coordinates the execution of query by
    routing them to registered handlers,
    using a globally accessible
    dependency injection container.
    """

    def __init__(self, container: punq.Container) -> None:
        """
        Initializes the QueryBus with an
        empty handler registry.
        It relies on a globally accessible
        DI container ('container').
        """

        self._container = container
        self._handler_classes: Dict[
            Type[Query],
            Type[QueryHandler[Any]],
        ] = {}

    def _get_query_type(
        self,
        handler_cls: Type[QueryHandler[TQuery]],
    ) -> Type[TQuery]:
        """
        Inspects a handler class to find the
        Query type it handles.

        Raises:
            TypeError: If the query type cannot be determined.
        """

        for base in getattr(handler_cls, "__orig_bases__", []):
            origin = typing.get_origin(base)
            if origin is QueryHandler:
                args = typing.get_args(base)
                if (
                    args
                    and isinstance(args[0], type)
                    and issubclass(args[0], Query)  # noqa: E501
                ):
                    return typing.cast(Type[TQuery], args[0])

        raise TypeError(
            "Could not determine Query type for "
            f"handler {handler_cls.__name__}. "
            "Ensure it inherits directly like: "
            "MyHandler(QueryHandler[MySpecificQuery])."
        )

    def register(
        self, handler_cls: Type[QueryHandler[TQuery]]
    ) -> Type[QueryHandler[TQuery]]:
        """
        Decorator to register a QueryHandler class with
        the bus and the global DI container.

        Args:
            handler_cls: The QueryHandler class to register.

        Returns:
            The original handler class, unchanged.

        Raises:
            TypeError: If the handler_cls is not a valid
                       QueryHandler subclass or its query
                       type cannot be determined.
            ValueError: If a handler is already registered
                        for the query type.
        """

        is_class = inspect.isclass(handler_cls)
        if not is_class:
            raise TypeError(
                f"{repr(handler_cls)} is not a valid QueryHandler.",
            )

        if not issubclass(handler_cls, QueryHandler):
            raise TypeError(
                f"{handler_cls.__name__} is not a valid QueryHandler.",
            )

        query_type = self._get_query_type(handler_cls)

        if query_type in self._handler_classes:
            raise ValueError(
                f"Handler already registered for query {query_type.__name__}"  # noqa: E501
            )

        self._handler_classes[query_type] = handler_cls

        return handler_cls

    async def execute(
        self,
        query: Query,
        subject_id: Optional[str] = None,
        permissions: List[Permission] = [],
        *args,
        **kwargs,
    ) -> Any:
        """
        Executes a query by finding its handler class,
        resolving it via the
        global DI container, authorizing, and handling.

        Args:
            query: The query instance to execute.

        Raises:
            QueryHandlerNotFoundError: If no handler class is
                                        registered for the query type.
            punq.MissingDependencyError: If the container cannot
                                        resolve the handler or
                                        its dependencies.
            Exception: Any other exception raised
                        during handler resolution
                        or execution.
        """  # noqa: E501
        query_type: Type[Query] = type(query)
        handler_cls = self._handler_classes.get(query_type)

        if handler_cls is None:
            raise QueryHandlerNotFoundError(query_type)

        try:
            handler = self._container.resolve(handler_cls)
        except MissingDependencyError as e:
            raise MissingDependencyError(
                "Failed to resolve handler "
                f"{handler_cls.__name__} for query "
                f"{query_type.__name__}: {e}"
            ) from e
        except Exception as e:
            raise RuntimeError(
                f"Unexpected error resolving handler for {handler_cls.__name__}: {e}"  # noqa: E501
            ) from e

        if not isinstance(handler, QueryHandler):
            raise TypeError(
                f"Resolved object for {handler_cls.__name__} is not a QueryHandler instance."  # noqa: E501
            )

        return await handler.execute(
            query,
            subject_id=subject_id,
            permissions=permissions,
            *args,
            **kwargs,
        )

__init__(container)

Initializes the QueryBus with an empty handler registry. It relies on a globally accessible DI container ('container').

Source code in src/castlecraft_engineer/abstractions/query_bus.py
def __init__(self, container: punq.Container) -> None:
    """
    Initializes the QueryBus with an
    empty handler registry.
    It relies on a globally accessible
    DI container ('container').
    """

    self._container = container
    self._handler_classes: Dict[
        Type[Query],
        Type[QueryHandler[Any]],
    ] = {}

execute(query, subject_id=None, permissions=[], *args, **kwargs) async

Executes a query by finding its handler class, resolving it via the global DI container, authorizing, and handling.

Parameters:

Name Type Description Default
query Query

The query instance to execute.

required

Raises:

Type Description
QueryHandlerNotFoundError

If no handler class is registered for the query type.

MissingDependencyError

If the container cannot resolve the handler or its dependencies.

Exception

Any other exception raised during handler resolution or execution.

Source code in src/castlecraft_engineer/abstractions/query_bus.py
async def execute(
    self,
    query: Query,
    subject_id: Optional[str] = None,
    permissions: List[Permission] = [],
    *args,
    **kwargs,
) -> Any:
    """
    Executes a query by finding its handler class,
    resolving it via the
    global DI container, authorizing, and handling.

    Args:
        query: The query instance to execute.

    Raises:
        QueryHandlerNotFoundError: If no handler class is
                                    registered for the query type.
        punq.MissingDependencyError: If the container cannot
                                    resolve the handler or
                                    its dependencies.
        Exception: Any other exception raised
                    during handler resolution
                    or execution.
    """  # noqa: E501
    query_type: Type[Query] = type(query)
    handler_cls = self._handler_classes.get(query_type)

    if handler_cls is None:
        raise QueryHandlerNotFoundError(query_type)

    try:
        handler = self._container.resolve(handler_cls)
    except MissingDependencyError as e:
        raise MissingDependencyError(
            "Failed to resolve handler "
            f"{handler_cls.__name__} for query "
            f"{query_type.__name__}: {e}"
        ) from e
    except Exception as e:
        raise RuntimeError(
            f"Unexpected error resolving handler for {handler_cls.__name__}: {e}"  # noqa: E501
        ) from e

    if not isinstance(handler, QueryHandler):
        raise TypeError(
            f"Resolved object for {handler_cls.__name__} is not a QueryHandler instance."  # noqa: E501
        )

    return await handler.execute(
        query,
        subject_id=subject_id,
        permissions=permissions,
        *args,
        **kwargs,
    )

register(handler_cls)

Decorator to register a QueryHandler class with the bus and the global DI container.

Parameters:

Name Type Description Default
handler_cls Type[QueryHandler[TQuery]]

The QueryHandler class to register.

required

Returns:

Type Description
Type[QueryHandler[TQuery]]

The original handler class, unchanged.

Raises:

Type Description
TypeError

If the handler_cls is not a valid QueryHandler subclass or its query type cannot be determined.

ValueError

If a handler is already registered for the query type.

Source code in src/castlecraft_engineer/abstractions/query_bus.py
def register(
    self, handler_cls: Type[QueryHandler[TQuery]]
) -> Type[QueryHandler[TQuery]]:
    """
    Decorator to register a QueryHandler class with
    the bus and the global DI container.

    Args:
        handler_cls: The QueryHandler class to register.

    Returns:
        The original handler class, unchanged.

    Raises:
        TypeError: If the handler_cls is not a valid
                   QueryHandler subclass or its query
                   type cannot be determined.
        ValueError: If a handler is already registered
                    for the query type.
    """

    is_class = inspect.isclass(handler_cls)
    if not is_class:
        raise TypeError(
            f"{repr(handler_cls)} is not a valid QueryHandler.",
        )

    if not issubclass(handler_cls, QueryHandler):
        raise TypeError(
            f"{handler_cls.__name__} is not a valid QueryHandler.",
        )

    query_type = self._get_query_type(handler_cls)

    if query_type in self._handler_classes:
        raise ValueError(
            f"Handler already registered for query {query_type.__name__}"  # noqa: E501
        )

    self._handler_classes[query_type] = handler_cls

    return handler_cls

QueryHandler

Bases: Generic[TQuery], ABC

Abstract base class for query handlers. Each handler is responsible for processing a specific type of query.

Source code in src/castlecraft_engineer/abstractions/query_handler.py
class QueryHandler(Generic[TQuery], abc.ABC):
    """
    Abstract base class for query handlers.
    Each handler is responsible for processing
    a specific type of query.
    """

    async def authorize(
        self,
        subject_id: Optional[str] = None,
        permissions: List[Permission] = [],
        *args: Any,
        **kwargs: Any,
    ) -> Optional[bool]:
        """
        Optional pre-execution authorization check for the query.

        NOTE: This method is NOT automatically called by the default QueryBus.
        It serves as a convention or hook for developers to implement custom
        pre-authorization logic if they choose to call it explicitly from within
        their `execute` method, or if they are using a custom bus implementation
        that invokes it.

        The recommended pattern for authorization with the default bus is to use
        an injected `AuthorizationService` within the `execute` method.

        Args:
            subject_id: The ID of the subject.
            permissions: A list of Permission objects representing the
                         permissions granted to the subject.
            *args: Additional positional arguments.
            **kwargs: Additional keyword arguments for context.
        Returns:
            True if the subject has permission, False otherwise.
        """
        pass  # pragma: no cover

    @abc.abstractmethod
    async def execute(
        self,
        query: TQuery,
        *args,
        subject_id: Optional[str] = None,
        permissions: List[Permission] = [],
        **kwargs,
    ) -> Any:
        """
        Handles the execution logic for the given query.

        Args:
            query: The query instance to be processed.
            subject_id: The ID of the subject attempting
                        to execute the query. Optional.
            permissions: The permissions associated with the subject.

        Raises:
            NotImplementedError: This method must be implemented by concrete subclasses.
        """
        raise NotImplementedError

authorize(subject_id=None, permissions=[], *args, **kwargs) async

Optional pre-execution authorization check for the query.

NOTE: This method is NOT automatically called by the default QueryBus. It serves as a convention or hook for developers to implement custom pre-authorization logic if they choose to call it explicitly from within their execute method, or if they are using a custom bus implementation that invokes it.

The recommended pattern for authorization with the default bus is to use an injected AuthorizationService within the execute method.

Parameters:

Name Type Description Default
subject_id Optional[str]

The ID of the subject.

None
permissions List[Permission]

A list of Permission objects representing the permissions granted to the subject.

[]
*args Any

Additional positional arguments.

()
**kwargs Any

Additional keyword arguments for context.

{}

Returns: True if the subject has permission, False otherwise.

Source code in src/castlecraft_engineer/abstractions/query_handler.py
async def authorize(
    self,
    subject_id: Optional[str] = None,
    permissions: List[Permission] = [],
    *args: Any,
    **kwargs: Any,
) -> Optional[bool]:
    """
    Optional pre-execution authorization check for the query.

    NOTE: This method is NOT automatically called by the default QueryBus.
    It serves as a convention or hook for developers to implement custom
    pre-authorization logic if they choose to call it explicitly from within
    their `execute` method, or if they are using a custom bus implementation
    that invokes it.

    The recommended pattern for authorization with the default bus is to use
    an injected `AuthorizationService` within the `execute` method.

    Args:
        subject_id: The ID of the subject.
        permissions: A list of Permission objects representing the
                     permissions granted to the subject.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments for context.
    Returns:
        True if the subject has permission, False otherwise.
    """
    pass  # pragma: no cover

execute(query, *args, subject_id=None, permissions=[], **kwargs) abstractmethod async

Handles the execution logic for the given query.

Parameters:

Name Type Description Default
query TQuery

The query instance to be processed.

required
subject_id Optional[str]

The ID of the subject attempting to execute the query. Optional.

None
permissions List[Permission]

The permissions associated with the subject.

[]

Raises:

Type Description
NotImplementedError

This method must be implemented by concrete subclasses.

Source code in src/castlecraft_engineer/abstractions/query_handler.py
@abc.abstractmethod
async def execute(
    self,
    query: TQuery,
    *args,
    subject_id: Optional[str] = None,
    permissions: List[Permission] = [],
    **kwargs,
) -> Any:
    """
    Handles the execution logic for the given query.

    Args:
        query: The query instance to be processed.
        subject_id: The ID of the subject attempting
                    to execute the query. Optional.
        permissions: The permissions associated with the subject.

    Raises:
        NotImplementedError: This method must be implemented by concrete subclasses.
    """
    raise NotImplementedError