Skip to content

Queries, Query Handlers, and Query Bus

In a CQRS (Command Query Responsibility Segregation) architecture, Queries are responsible for retrieving data from the system without causing any state changes. castlecraft-engineer provides a structured approach to define, handle, and dispatch these queries.

1. The Query Base Class

All queries in your application should inherit from the Query base class, located in castlecraft_engineer.abstractions.query.Query.

  • Purpose: A Query is a simple data structure (often a dataclass) that encapsulates all the parameters required to fetch specific data. It's a message that expresses a request for information.
  • Characteristics:
    • Typically named to reflect the data being requested (e.g., GetUserByIdQuery, ListActiveProductsQuery).
    • Should be immutable once created.
    • Carry data (parameters for the query) but no behavior.
    • Crucially, executing a query should never modify system state.
# Example: Defining a Query
from dataclasses import dataclass
import uuid
from castlecraft_engineer.abstractions.query import Query

@dataclass(frozen=True)
class GetProductDetailsQuery(Query[dict]): # TResult is dict here, representing product details
    product_id: uuid.UUID

2. The QueryHandler Base Class

For every query, there should be a corresponding QueryHandler. Query Handlers contain the logic to process a query and return the requested data. They inherit from castlecraft_engineer.abstractions.query_handler.QueryHandler[TQuery, TResult].

  • Purpose: To encapsulate the logic required to execute a specific query. This often involves interacting with repositories (like ModelRepository or AsyncModelRepository for direct model access, or AggregateRepository if querying an aggregate's state) or other data sources.
  • Key Methods:
    • execute(self, query: TQuery) -> TResult (or async execute for asynchronous handlers): This is the main method where the query's logic is implemented. It receives the query object and returns the fetched data (TResult).
    • authorize(self, query: TQuery) -> None (or async authorize): This method is called by the QueryBus before execute. It's responsible for checking if the current context (e.g., user, permissions) is authorized to execute the given query and access the requested data. If authorization fails, it should raise an AuthorizationError.
# Example: Defining a QueryHandler
from typing import Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession # Assuming async operation
from castlecraft_engineer.abstractions.query_handler import QueryHandler
from castlecraft_engineer.database.repository import AsyncModelRepository # Example for fetching data
from castlecraft_engineer.authorization.base_service import AuthorizationService # Example
from castlecraft_engineer.exc import AuthorizationError # Standard error

# Assuming GetProductDetailsQuery is defined as above
# Assuming ProductModel is a SQLModel representing product data
# from your_models.product import ProductModel

class ProductModel(SQLModel, table=True): # Dummy for example
    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    name: str
    description: Optional[str] = None
    price: float

class ProductRepository(AsyncModelRepository[ProductModel, uuid.UUID]):
    def __init__(self):
        super().__init__(model_cls=ProductModel)

class GetProductDetailsHandler(QueryHandler[GetProductDetailsQuery, Optional[Dict[str, Any]]]):
    def __init__(self, session: AsyncSession, product_repo: ProductRepository, auth_service: AuthorizationService):
        self.session = session
        self.product_repo = product_repo
        self.auth_service = auth_service

    async def authorize(self, query: GetProductDetailsQuery) -> None:
        # Example: Check if the current context can view this product.
        # This is a simplified check. Real checks might involve user roles, resource ownership etc.
        # await self.auth_service.check_permission(required_permissions=[('product', 'read', query.product_id)])
        print(f"Authorizing GetProductDetailsQuery for product_id: {query.product_id}")
        # If not authorized: raise AuthorizationError("Not authorized to view this product.")
        pass # For this example, always allow

    async def execute(self, query: GetProductDetailsQuery) -> Optional[Dict[str, Any]]:
        print(f"Executing GetProductDetailsQuery for product_id: {query.product_id}")
        product_model = await self.product_repo.get_by_id(self.session, query.product_id)
        if product_model:
            return {
                "id": product_model.id,
                "name": product_model.name,
                "description": product_model.description,
                "price": product_model.price,
            }
        return None

3. The QueryBus

The QueryBus (from castlecraft_engineer.abstractions.query_bus.QueryBus) acts as a central dispatcher for queries. It decouples the part of your application that requests data (e.g., an API endpoint) from the handler that retrieves it.

  • Purpose: To route queries to their appropriate handlers.
  • Key Functionalities:
    • Handler Registration: Query Handlers are registered with the QueryBus for specific Query types. This is often done at application startup, typically using a Dependency Injection (DI) container.
      • castlecraft-engineer's QueryBus uses a decorator for registration: @query_bus.register(QueryType, QueryHandlerType) when used with the ContainerBuilder from common/di.py.
    • Handler Resolution: When a query is dispatched, the bus looks up the registered handler for that query's type. castlecraft-engineer's QueryBus resolves handlers from a punq.Container (the DI container).
    • Execution Flow: When query_bus.execute(query) is called:
      1. The bus resolves the handler for the type of query.
      2. It calls the handler's authorize(query) method.
      3. If authorization succeeds, it calls the handler's execute(query) method.
      4. It returns the result from the handler's execute method.

4. QueryHandlerNotFoundError Exception

If the QueryBus receives a query for which no handler has been registered, it will raise a QueryHandlerNotFoundError. This exception is defined in castlecraft_engineer.abstractions.query_bus and re-exported in castlecraft_engineer.exc.

It's crucial to ensure all your queries have corresponding handlers registered with the bus.

5. Code Example: Wiring Up and Executing

This example shows how to set up the DI container, register the handler, and execute a query via the bus.

import asyncio
import uuid
from typing import Optional, Dict, Any
from sqlmodel import SQLModel, Field # For dummy ProductModel
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.query_bus import QueryBus
from castlecraft_engineer.authorization.default_services import AllowAllAuthorizationService # For example
from castlecraft_engineer.authorization.base_service import AuthorizationService
from castlecraft_engineer.exc import QueryHandlerNotFoundError, AuthorizationError

# Assuming GetProductDetailsQuery, ProductModel, ProductRepository, and GetProductDetailsHandler are defined as above
# from your_app.queries import GetProductDetailsQuery
# from your_app.handlers import GetProductDetailsHandler
# from your_app.models import ProductModel, ProductRepository

async def main():
    # 1. Configure DI Container
    builder = ContainerBuilder()

    # Register a dummy AsyncSession factory for this example if not using a real DB
    class MockAsyncSession:
        async def __aenter__(self): return self
        async def __aexit__(self, exc_type, exc, tb): pass
        async def commit(self): print("MockSession: commit called")
        async def rollback(self): print("MockSession: rollback called")

    builder.register(AuthorizationService, AllowAllAuthorizationService) # Example auth service
    builder.register(MockAsyncSession) # Registering the mock session itself for AsyncSession type hint
    builder.register(ProductRepository) # Register the repository

    # Register QueryBus and the handler
    builder.with_query_bus()
    # The QueryBus instance is available on the builder after with_query_bus()
    # GetProductDetailsHandler depends on AsyncSession, ProductRepository, and AuthorizationService
    builder.query_bus.register(GetProductDetailsQuery, GetProductDetailsHandler)

    container = builder.build()

    # 2. Resolve QueryBus
    query_bus = container.resolve(QueryBus)

    # 3. Create and Execute Query
    product_id_to_query = uuid.uuid4() # Assume this product exists for the example
    # In a real scenario, you might first create a product or have one in the DB.
    # For this example, the handler will likely return None if the mock repo doesn't find it.

    query = GetProductDetailsQuery(product_id=product_id_to_query)

    try:
        print(f"Dispatching query: {query}")
        product_details = await query_bus.execute(query)
        if product_details:
            print(f"Query executed. Result (product_details): {product_details}")
        else:
            print(f"Query executed. Product with ID {product_id_to_query} not found.")
    except QueryHandlerNotFoundError:
        print(f"Error: No handler found for {type(query).__name__}")
    except AuthorizationError as e:
        print(f"Authorization Error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

if __name__ == "__main__":
    # To run this example, ensure classes are defined or imported correctly.
    # For simplicity, if you copy-paste, define GetProductDetailsQuery, ProductModel,
    # ProductRepository, and GetProductDetailsHandler directly in this script before main().
    asyncio.run(main())

This demonstrates the complete flow for queries: defining the query and its handler, registering them with the DI container and QueryBus, and finally executing the query to retrieve data. The authorize method in the handler provides a crucial hook for permission checks before data access.