Queries, Query Handlers, and Query Bus¶
In a CQRS (Command Query Responsibility Segregation) architecture, Queries are responsible for retrieving data from the system without causing any state changes. castlecraft-engineer provides a structured approach to define, handle, and dispatch these queries.
1. The Query Base Class¶
All queries in your application should inherit from the Query base class, located in castlecraft_engineer.abstractions.query.Query.
- Purpose: A
Queryis a simple data structure (often a dataclass) that encapsulates all the parameters required to fetch specific data. It's a message that expresses a request for information. - Characteristics:
- Typically named to reflect the data being requested (e.g.,
GetUserByIdQuery,ListActiveProductsQuery). - Should be immutable once created.
- Carry data (parameters for the query) but no behavior.
- Crucially, executing a query should never modify system state.
- Typically named to reflect the data being requested (e.g.,
# Example: Defining a Query
from dataclasses import dataclass
import uuid
from castlecraft_engineer.abstractions.query import Query
@dataclass(frozen=True)
class GetProductDetailsQuery(Query[dict]): # TResult is dict here, representing product details
product_id: uuid.UUID
2. The QueryHandler Base Class¶
For every query, there should be a corresponding QueryHandler. Query Handlers contain the logic to process a query and return the requested data. They inherit from castlecraft_engineer.abstractions.query_handler.QueryHandler[TQuery, TResult].
- Purpose: To encapsulate the logic required to execute a specific query. This often involves interacting with repositories (like
ModelRepositoryorAsyncModelRepositoryfor direct model access, orAggregateRepositoryif querying an aggregate's state) or other data sources. - Key Methods:
execute(self, query: TQuery) -> TResult(orasync executefor asynchronous handlers): This is the main method where the query's logic is implemented. It receives the query object and returns the fetched data (TResult).authorize(self, query: TQuery) -> None(orasync authorize): This method is called by theQueryBusbeforeexecute. It's responsible for checking if the current context (e.g., user, permissions) is authorized to execute the given query and access the requested data. If authorization fails, it should raise anAuthorizationError.
# Example: Defining a QueryHandler
from typing import Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession # Assuming async operation
from castlecraft_engineer.abstractions.query_handler import QueryHandler
from castlecraft_engineer.database.repository import AsyncModelRepository # Example for fetching data
from castlecraft_engineer.authorization.base_service import AuthorizationService # Example
from castlecraft_engineer.exc import AuthorizationError # Standard error
# Assuming GetProductDetailsQuery is defined as above
# Assuming ProductModel is a SQLModel representing product data
# from your_models.product import ProductModel
class ProductModel(SQLModel, table=True): # Dummy for example
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
name: str
description: Optional[str] = None
price: float
class ProductRepository(AsyncModelRepository[ProductModel, uuid.UUID]):
def __init__(self):
super().__init__(model_cls=ProductModel)
class GetProductDetailsHandler(QueryHandler[GetProductDetailsQuery, Optional[Dict[str, Any]]]):
def __init__(self, session: AsyncSession, product_repo: ProductRepository, auth_service: AuthorizationService):
self.session = session
self.product_repo = product_repo
self.auth_service = auth_service
async def authorize(self, query: GetProductDetailsQuery) -> None:
# Example: Check if the current context can view this product.
# This is a simplified check. Real checks might involve user roles, resource ownership etc.
# await self.auth_service.check_permission(required_permissions=[('product', 'read', query.product_id)])
print(f"Authorizing GetProductDetailsQuery for product_id: {query.product_id}")
# If not authorized: raise AuthorizationError("Not authorized to view this product.")
pass # For this example, always allow
async def execute(self, query: GetProductDetailsQuery) -> Optional[Dict[str, Any]]:
print(f"Executing GetProductDetailsQuery for product_id: {query.product_id}")
product_model = await self.product_repo.get_by_id(self.session, query.product_id)
if product_model:
return {
"id": product_model.id,
"name": product_model.name,
"description": product_model.description,
"price": product_model.price,
}
return None
3. The QueryBus¶
The QueryBus (from castlecraft_engineer.abstractions.query_bus.QueryBus) acts as a central dispatcher for queries. It decouples the part of your application that requests data (e.g., an API endpoint) from the handler that retrieves it.
- Purpose: To route queries to their appropriate handlers.
- Key Functionalities:
- Handler Registration: Query Handlers are registered with the
QueryBusfor specificQuerytypes. This is often done at application startup, typically using a Dependency Injection (DI) container.castlecraft-engineer'sQueryBususes a decorator for registration:@query_bus.register(QueryType, QueryHandlerType)when used with theContainerBuilderfromcommon/di.py.
- Handler Resolution: When a query is dispatched, the bus looks up the registered handler for that query's type.
castlecraft-engineer'sQueryBusresolves handlers from apunq.Container(the DI container). - Execution Flow: When
query_bus.execute(query)is called:- The bus resolves the handler for the type of
query. - It calls the handler's
authorize(query)method. - If authorization succeeds, it calls the handler's
execute(query)method. - It returns the result from the handler's
executemethod.
- The bus resolves the handler for the type of
- Handler Registration: Query Handlers are registered with the
4. QueryHandlerNotFoundError Exception¶
If the QueryBus receives a query for which no handler has been registered, it will raise a QueryHandlerNotFoundError. This exception is defined in castlecraft_engineer.abstractions.query_bus and re-exported in castlecraft_engineer.exc.
It's crucial to ensure all your queries have corresponding handlers registered with the bus.
5. Code Example: Wiring Up and Executing¶
This example shows how to set up the DI container, register the handler, and execute a query via the bus.
import asyncio
import uuid
from typing import Optional, Dict, Any
from sqlmodel import SQLModel, Field # For dummy ProductModel
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.query_bus import QueryBus
from castlecraft_engineer.authorization.default_services import AllowAllAuthorizationService # For example
from castlecraft_engineer.authorization.base_service import AuthorizationService
from castlecraft_engineer.exc import QueryHandlerNotFoundError, AuthorizationError
# Assuming GetProductDetailsQuery, ProductModel, ProductRepository, and GetProductDetailsHandler are defined as above
# from your_app.queries import GetProductDetailsQuery
# from your_app.handlers import GetProductDetailsHandler
# from your_app.models import ProductModel, ProductRepository
async def main():
# 1. Configure DI Container
builder = ContainerBuilder()
# Register a dummy AsyncSession factory for this example if not using a real DB
class MockAsyncSession:
async def __aenter__(self): return self
async def __aexit__(self, exc_type, exc, tb): pass
async def commit(self): print("MockSession: commit called")
async def rollback(self): print("MockSession: rollback called")
builder.register(AuthorizationService, AllowAllAuthorizationService) # Example auth service
builder.register(MockAsyncSession) # Registering the mock session itself for AsyncSession type hint
builder.register(ProductRepository) # Register the repository
# Register QueryBus and the handler
builder.with_query_bus()
# The QueryBus instance is available on the builder after with_query_bus()
# GetProductDetailsHandler depends on AsyncSession, ProductRepository, and AuthorizationService
builder.query_bus.register(GetProductDetailsQuery, GetProductDetailsHandler)
container = builder.build()
# 2. Resolve QueryBus
query_bus = container.resolve(QueryBus)
# 3. Create and Execute Query
product_id_to_query = uuid.uuid4() # Assume this product exists for the example
# In a real scenario, you might first create a product or have one in the DB.
# For this example, the handler will likely return None if the mock repo doesn't find it.
query = GetProductDetailsQuery(product_id=product_id_to_query)
try:
print(f"Dispatching query: {query}")
product_details = await query_bus.execute(query)
if product_details:
print(f"Query executed. Result (product_details): {product_details}")
else:
print(f"Query executed. Product with ID {product_id_to_query} not found.")
except QueryHandlerNotFoundError:
print(f"Error: No handler found for {type(query).__name__}")
except AuthorizationError as e:
print(f"Authorization Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
if __name__ == "__main__":
# To run this example, ensure classes are defined or imported correctly.
# For simplicity, if you copy-paste, define GetProductDetailsQuery, ProductModel,
# ProductRepository, and GetProductDetailsHandler directly in this script before main().
asyncio.run(main())
This demonstrates the complete flow for queries: defining the query and its handler, registering them with the DI container and QueryBus, and finally executing the query to retrieve data. The authorize method in the handler provides a crucial hook for permission checks before data access.