Skip to content

Queries, Query Handlers, and Query Bus

In a CQRS (Command Query Responsibility Segregation) architecture, Queries are responsible for retrieving data from the system without causing any state changes. castlecraft-engineer provides a structured approach to define, handle, and dispatch these queries.

1. The Query Base Class

All queries in your application should inherit from the Query base class, located in castlecraft_engineer.abstractions.query.Query.

  • Purpose: A Query is a simple data structure (often a dataclass) that encapsulates all the parameters required to fetch specific data. It's a message that expresses a request for information.
  • Characteristics:
    • Typically named to reflect the data being requested (e.g., GetUserByIdQuery, ListActiveProductsQuery).
    • Should be immutable once created (e.g., using @dataclass(frozen=True)).
    • Carry data (parameters for the query) but no behavior.
    • Crucially, executing a query should never modify system state.
    • The base Query class can be generic, Query[TResult], allowing you to specify the expected result type of the query.
# Example: Defining a Query
from dataclasses import dataclass
import uuid
from typing import Optional, Dict, Any # For TResult type hint
from castlecraft_engineer.abstractions.query import Query

@dataclass(frozen=True)
# If Query base class is not generic, or if result type is handled by handler only:
class GetProductDetailsQuery(Query): # No TResult here if base Query isn't generic
    product_id: uuid.UUID

2. The QueryHandler Base Class

For every query, there should be a corresponding QueryHandler. Query Handlers contain the logic to process a query and return the requested data. They inherit from castlecraft_engineer.abstractions.query_handler.QueryHandler[TQuery, TResult].

  • Purpose: To encapsulate the logic required to execute a specific query. This often involves interacting with repositories or other data sources.
  • Key Methods:
    • execute(self, query: TQuery, **kwargs) -> TResult (or async execute): This is the main method where the query's logic is implemented. It receives the query object and returns the fetched data (TResult). The **kwargs can receive context like subject_id or permissions from the bus.
    • authorize(self, query: TQuery, subject_id: Optional[str] = None, **kwargs) -> bool (or async authorize): This method is called by the QueryBus before execute. It's responsible for checking if the current context (e.g., user, permissions) is authorized to execute the given query and access the requested data. If authorization fails, it should raise an AuthorizationError. It should return True if authorized.
# Example: Defining a QueryHandler
import uuid # Ensure uuid is imported for ProductModel example
from typing import Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession # Assuming async operation
from sqlmodel import SQLModel, Field # For dummy ProductModel
from castlecraft_engineer.abstractions.query_handler import QueryHandler
from castlecraft_engineer.database.repository import AsyncModelRepository # Example for fetching data
from castlecraft_engineer.authorization.base_service import AuthorizationService # Example
from castlecraft_engineer.exc import AuthorizationError # Standard error

# Assuming GetProductDetailsQuery is defined as above

# Dummy ProductModel for the example
class ProductModel(SQLModel, table=True):
    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    name: str
    description: Optional[str] = None
    price: float

# Dummy ProductRepository for the example
class ProductRepository(AsyncModelRepository[ProductModel]): # TModel is ProductModel
    def __init__(self):
        super().__init__(model_cls=ProductModel)

# Assuming QueryHandler in your library is Generic[TQuery] and execute returns Any
# The actual result type (Optional[Dict[str, Any]]) is then specified on the execute method.
class GetProductDetailsHandler(QueryHandler[GetProductDetailsQuery]):
    def __init__(self, session: AsyncSession, product_repo: ProductRepository, auth_service: AuthorizationService):
        self.session = session
        self.product_repo = product_repo
        self.auth_service = auth_service

    async def authorize(self, query: GetProductDetailsQuery, subject_id: Optional[str] = None, **kwargs) -> bool:
        # Example: Check if the current context can view this product.
        # This is a simplified check. Real checks might involve user roles, resource ownership etc.
        # await self.auth_service.check_permission(subject_id=subject_id, required_permissions=[...])
        print(f"Authorizing GetProductDetailsQuery for product_id: {query.product_id} by subject: {subject_id}")
        # If not authorized: raise AuthorizationError("Not authorized to view this product.")
        return True # For this example, always allow

    async def execute(self, query: GetProductDetailsQuery, **kwargs) -> Optional[Dict[str, Any]]:
        print(f"Executing GetProductDetailsQuery for product_id: {query.product_id}")
        # subject_id = kwargs.get('subject_id')
        # permissions = kwargs.get('permissions')
        product_model = await self.product_repo.get_by_id(self.session, query.product_id)
        if product_model:
            return {
                "id": str(product_model.id), # Ensure UUID is serialized if needed
                "name": product_model.name,
                "description": product_model.description,
                "price": product_model.price,
            }
        return None

3. The QueryBus

The QueryBus (from castlecraft_engineer.abstractions.query_bus.QueryBus) acts as a central dispatcher for queries. It decouples the part of your application that requests data (e.g., an API endpoint) from the handler that retrieves it.

  • Purpose: To route queries to their appropriate handlers.
  • Key Functionalities:
    • Handler Registration: Query Handlers are registered with the QueryBus for specific Query types. This is often done at application startup, typically using a Dependency Injection (DI) container.
      • castlecraft-engineer's QueryBus (when used with ContainerBuilder from common/di.py) allows registration via builder.query_bus.register(QueryType, QueryHandlerType).
    • Handler Resolution: When a query is dispatched, the bus looks up the registered handler for that query's type. castlecraft-engineer's QueryBus resolves handlers from a punq.Container (the DI container).
    • Execution Flow: When query_bus.execute(query, **kwargs) is called (where kwargs can include subject_id, permissions for authorization):
      1. The bus resolves the handler for the type of query.
      2. It calls the handler's authorize(query, **kwargs) method (passing through relevant kwargs).
      3. If authorization succeeds (returns True), it calls the handler's execute(query, **kwargs) method (passing through relevant kwargs).
      4. It returns the result from the handler's execute method.

4. QueryHandlerNotFoundError Exception

If the QueryBus receives a query for which no handler has been registered, it will raise a QueryHandlerNotFoundError. This exception is defined in castlecraft_engineer.abstractions.query_bus and re-exported in castlecraft_engineer.exc.

It's crucial to ensure all your queries have corresponding handlers registered with the bus.

5. Code Example: Wiring Up and Executing

This example shows how to set up the DI container, register the handler, and execute a query via the bus.

import asyncio
import uuid
from typing import Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession # For type hinting
from sqlmodel import SQLModel, Field # For dummy ProductModel
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.query_bus import QueryBus
from castlecraft_engineer.abstractions.query import Query # For GetProductDetailsQuery base
from castlecraft_engineer.abstractions.query_handler import QueryHandler # For GetProductDetailsHandler base
from castlecraft_engineer.database.repository import AsyncModelRepository # For ProductRepository base
from castlecraft_engineer.authorization.default_services import AllowAllAuthorizationService # For example
from castlecraft_engineer.authorization.base_service import AuthorizationService
from castlecraft_engineer.exc import QueryHandlerNotFoundError, AuthorizationError

# --- Definitions from above examples (for self-contained script) ---
@dataclass(frozen=True) # Assuming Query base class is not generic for TResult
class GetProductDetailsQuery(Query):
    product_id: uuid.UUID

class ProductModel(SQLModel, table=True):
    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    name: str
    description: Optional[str] = None
    price: float

class ProductRepository(AsyncModelRepository[ProductModel]):
    def __init__(self):
        super().__init__(model_cls=ProductModel)

# Corrected QueryHandler signature
class GetProductDetailsHandler(QueryHandler[GetProductDetailsQuery]):
    def __init__(self, session: AsyncSession, product_repo: ProductRepository, auth_service: AuthorizationService):
        self.session = session
        self.product_repo = product_repo
        self.auth_service = auth_service

    async def authorize(self, query: GetProductDetailsQuery, subject_id: Optional[str] = None, **kwargs) -> bool:
        print(f"Authorizing GetProductDetailsQuery for product_id: {query.product_id} by subject: {subject_id}")
        return True

    async def execute(self, query: GetProductDetailsQuery, **kwargs) -> Optional[Dict[str, Any]]:
        print(f"Executing GetProductDetailsQuery for product_id: {query.product_id}")
        # Example: Simulate finding a product (replace with actual repo call if it were real)
        # For this mock setup, we'll assume it might not find it to test the None path.
        # product_model = await self.product_repo.get_by_id(self.session, query.product_id)
        # if product_model:
        #     return { ... }
        if str(query.product_id) == "00000000-0000-0000-0000-000000000001": # Mock finding one product
            return {"id": str(query.product_id), "name": "Mock Product", "price": 10.99}
        return None
# --- End Definitions ---

async def main():
    # 1. Configure DI Container
    builder = ContainerBuilder()

    # Register a dummy AsyncSession for this example if not using a real DB
    class MockAsyncSession:
        # Add methods that might be called by the repository if it were real
        # For get_by_id, session.get might be called.
        async def get(self, model_cls, ident): # Simulate session.get
            print(f"MockSession.get called for {model_cls} with id {ident}")
            if str(ident) == "00000000-0000-0000-0000-000000000001": # Match ID used in handler's mock logic
                return ProductModel(id=ident, name="Mock Product", price=10.99)
            return None
        async def commit(self): print("MockSession: commit called")
        async def rollback(self): print("MockSession: rollback called")
        async def close(self): print("MockSession: close called")
        async def __aenter__(self): return self # For 'async with' if used by repo/handler
        async def __aexit__(self, exc_type, exc, tb): await self.close()

    builder.register(AuthorizationService, AllowAllAuthorizationService) # Example auth service
    builder.register(AsyncSession, instance=MockAsyncSession()) # Register an instance of the mock
    builder.register(ProductRepository) # Register the repository

    # Register QueryBus and the handler
    builder.with_query_bus()
    # The QueryBus instance is available on the builder after with_query_bus()
    # GetProductDetailsHandler depends on AsyncSession, ProductRepository, and AuthorizationService
    builder.query_bus.register(GetProductDetailsQuery, GetProductDetailsHandler)

    container = builder.build()

    # 2. Resolve QueryBus
    query_bus = container.resolve(QueryBus)

    # 3. Create and Execute Query
    product_id_to_query = uuid.UUID("00000000-0000-0000-0000-000000000001") # Use a known mockable ID
    requesting_user_id = "user-abc-123"

    query = GetProductDetailsQuery(product_id=product_id_to_query)

    try:
        print(f"Dispatching query: {query}")
        # Pass subject_id for authorization context
        product_details = await query_bus.execute(query, subject_id=requesting_user_id)
        if product_details:
            print(f"Query executed. Result (product_details): {product_details}")
        else:
            print(f"Query executed. Product with ID {product_id_to_query} not found.")
    except QueryHandlerNotFoundError:
        print(f"Error: No handler found for {type(query).__name__}")
    except AuthorizationError as e:
        print(f"Authorization Error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

if __name__ == "__main__":
    asyncio.run(main())

This demonstrates the complete flow for queries: defining the query and its handler, registering them with the DI container and QueryBus, and finally executing the query to retrieve data. The authorize method in the handler provides a crucial hook for permission checks before data access.