Queries, Query Handlers, and Query Bus¶
In a CQRS (Command Query Responsibility Segregation) architecture, Queries are responsible for retrieving data from the system without causing any state changes. castlecraft-engineer provides a structured approach to define, handle, and dispatch these queries.
1. The Query Base Class¶
All queries in your application should inherit from the Query base class, located in castlecraft_engineer.abstractions.query.Query.
- Purpose: A
Queryis a simple data structure (often a dataclass) that encapsulates all the parameters required to fetch specific data. It's a message that expresses a request for information. - Characteristics:
- Typically named to reflect the data being requested (e.g.,
GetUserByIdQuery,ListActiveProductsQuery). - Should be immutable once created (e.g., using
@dataclass(frozen=True)). - Carry data (parameters for the query) but no behavior.
- Crucially, executing a query should never modify system state.
- The base
Queryclass can be generic,Query[TResult], allowing you to specify the expected result type of the query.
- Typically named to reflect the data being requested (e.g.,
# Example: Defining a Query
from dataclasses import dataclass
import uuid
from typing import Optional, Dict, Any # For TResult type hint
from castlecraft_engineer.abstractions.query import Query
@dataclass(frozen=True)
# If Query base class is not generic, or if result type is handled by handler only:
class GetProductDetailsQuery(Query): # No TResult here if base Query isn't generic
product_id: uuid.UUID
2. The QueryHandler Base Class¶
For every query, there should be a corresponding QueryHandler. Query Handlers contain the logic to process a query and return the requested data. They inherit from castlecraft_engineer.abstractions.query_handler.QueryHandler[TQuery, TResult].
- Purpose: To encapsulate the logic required to execute a specific query. This often involves interacting with repositories or other data sources.
- Key Methods:
execute(self, query: TQuery, **kwargs) -> TResult(orasync execute): This is the main method where the query's logic is implemented. It receives the query object and returns the fetched data (TResult). The**kwargscan receive context likesubject_idorpermissionsfrom the bus.authorize(self, query: TQuery, subject_id: Optional[str] = None, **kwargs) -> bool(orasync authorize): This method is called by theQueryBusbeforeexecute. It's responsible for checking if the current context (e.g., user, permissions) is authorized to execute the given query and access the requested data. If authorization fails, it should raise anAuthorizationError. It should returnTrueif authorized.
# Example: Defining a QueryHandler
import uuid # Ensure uuid is imported for ProductModel example
from typing import Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession # Assuming async operation
from sqlmodel import SQLModel, Field # For dummy ProductModel
from castlecraft_engineer.abstractions.query_handler import QueryHandler
from castlecraft_engineer.database.repository import AsyncModelRepository # Example for fetching data
from castlecraft_engineer.authorization.base_service import AuthorizationService # Example
from castlecraft_engineer.exc import AuthorizationError # Standard error
# Assuming GetProductDetailsQuery is defined as above
# Dummy ProductModel for the example
class ProductModel(SQLModel, table=True):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
name: str
description: Optional[str] = None
price: float
# Dummy ProductRepository for the example
class ProductRepository(AsyncModelRepository[ProductModel]): # TModel is ProductModel
def __init__(self):
super().__init__(model_cls=ProductModel)
# Assuming QueryHandler in your library is Generic[TQuery] and execute returns Any
# The actual result type (Optional[Dict[str, Any]]) is then specified on the execute method.
class GetProductDetailsHandler(QueryHandler[GetProductDetailsQuery]):
def __init__(self, session: AsyncSession, product_repo: ProductRepository, auth_service: AuthorizationService):
self.session = session
self.product_repo = product_repo
self.auth_service = auth_service
async def authorize(self, query: GetProductDetailsQuery, subject_id: Optional[str] = None, **kwargs) -> bool:
# Example: Check if the current context can view this product.
# This is a simplified check. Real checks might involve user roles, resource ownership etc.
# await self.auth_service.check_permission(subject_id=subject_id, required_permissions=[...])
print(f"Authorizing GetProductDetailsQuery for product_id: {query.product_id} by subject: {subject_id}")
# If not authorized: raise AuthorizationError("Not authorized to view this product.")
return True # For this example, always allow
async def execute(self, query: GetProductDetailsQuery, **kwargs) -> Optional[Dict[str, Any]]:
print(f"Executing GetProductDetailsQuery for product_id: {query.product_id}")
# subject_id = kwargs.get('subject_id')
# permissions = kwargs.get('permissions')
product_model = await self.product_repo.get_by_id(self.session, query.product_id)
if product_model:
return {
"id": str(product_model.id), # Ensure UUID is serialized if needed
"name": product_model.name,
"description": product_model.description,
"price": product_model.price,
}
return None
3. The QueryBus¶
The QueryBus (from castlecraft_engineer.abstractions.query_bus.QueryBus) acts as a central dispatcher for queries. It decouples the part of your application that requests data (e.g., an API endpoint) from the handler that retrieves it.
- Purpose: To route queries to their appropriate handlers.
- Key Functionalities:
- Handler Registration: Query Handlers are registered with the
QueryBusfor specificQuerytypes. This is often done at application startup, typically using a Dependency Injection (DI) container.castlecraft-engineer'sQueryBus(when used withContainerBuilderfromcommon/di.py) allows registration viabuilder.query_bus.register(QueryType, QueryHandlerType).
- Handler Resolution: When a query is dispatched, the bus looks up the registered handler for that query's type.
castlecraft-engineer'sQueryBusresolves handlers from apunq.Container(the DI container). - Execution Flow: When
query_bus.execute(query, **kwargs)is called (wherekwargscan includesubject_id,permissionsfor authorization):- The bus resolves the handler for the type of
query. - It calls the handler's
authorize(query, **kwargs)method (passing through relevant kwargs). - If authorization succeeds (returns
True), it calls the handler'sexecute(query, **kwargs)method (passing through relevant kwargs). - It returns the result from the handler's
executemethod.
- The bus resolves the handler for the type of
- Handler Registration: Query Handlers are registered with the
4. QueryHandlerNotFoundError Exception¶
If the QueryBus receives a query for which no handler has been registered, it will raise a QueryHandlerNotFoundError. This exception is defined in castlecraft_engineer.abstractions.query_bus and re-exported in castlecraft_engineer.exc.
It's crucial to ensure all your queries have corresponding handlers registered with the bus.
5. Code Example: Wiring Up and Executing¶
This example shows how to set up the DI container, register the handler, and execute a query via the bus.
import asyncio
import uuid
from typing import Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession # For type hinting
from sqlmodel import SQLModel, Field # For dummy ProductModel
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.query_bus import QueryBus
from castlecraft_engineer.abstractions.query import Query # For GetProductDetailsQuery base
from castlecraft_engineer.abstractions.query_handler import QueryHandler # For GetProductDetailsHandler base
from castlecraft_engineer.database.repository import AsyncModelRepository # For ProductRepository base
from castlecraft_engineer.authorization.default_services import AllowAllAuthorizationService # For example
from castlecraft_engineer.authorization.base_service import AuthorizationService
from castlecraft_engineer.exc import QueryHandlerNotFoundError, AuthorizationError
# --- Definitions from above examples (for self-contained script) ---
@dataclass(frozen=True) # Assuming Query base class is not generic for TResult
class GetProductDetailsQuery(Query):
product_id: uuid.UUID
class ProductModel(SQLModel, table=True):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
name: str
description: Optional[str] = None
price: float
class ProductRepository(AsyncModelRepository[ProductModel]):
def __init__(self):
super().__init__(model_cls=ProductModel)
# Corrected QueryHandler signature
class GetProductDetailsHandler(QueryHandler[GetProductDetailsQuery]):
def __init__(self, session: AsyncSession, product_repo: ProductRepository, auth_service: AuthorizationService):
self.session = session
self.product_repo = product_repo
self.auth_service = auth_service
async def authorize(self, query: GetProductDetailsQuery, subject_id: Optional[str] = None, **kwargs) -> bool:
print(f"Authorizing GetProductDetailsQuery for product_id: {query.product_id} by subject: {subject_id}")
return True
async def execute(self, query: GetProductDetailsQuery, **kwargs) -> Optional[Dict[str, Any]]:
print(f"Executing GetProductDetailsQuery for product_id: {query.product_id}")
# Example: Simulate finding a product (replace with actual repo call if it were real)
# For this mock setup, we'll assume it might not find it to test the None path.
# product_model = await self.product_repo.get_by_id(self.session, query.product_id)
# if product_model:
# return { ... }
if str(query.product_id) == "00000000-0000-0000-0000-000000000001": # Mock finding one product
return {"id": str(query.product_id), "name": "Mock Product", "price": 10.99}
return None
# --- End Definitions ---
async def main():
# 1. Configure DI Container
builder = ContainerBuilder()
# Register a dummy AsyncSession for this example if not using a real DB
class MockAsyncSession:
# Add methods that might be called by the repository if it were real
# For get_by_id, session.get might be called.
async def get(self, model_cls, ident): # Simulate session.get
print(f"MockSession.get called for {model_cls} with id {ident}")
if str(ident) == "00000000-0000-0000-0000-000000000001": # Match ID used in handler's mock logic
return ProductModel(id=ident, name="Mock Product", price=10.99)
return None
async def commit(self): print("MockSession: commit called")
async def rollback(self): print("MockSession: rollback called")
async def close(self): print("MockSession: close called")
async def __aenter__(self): return self # For 'async with' if used by repo/handler
async def __aexit__(self, exc_type, exc, tb): await self.close()
builder.register(AuthorizationService, AllowAllAuthorizationService) # Example auth service
builder.register(AsyncSession, instance=MockAsyncSession()) # Register an instance of the mock
builder.register(ProductRepository) # Register the repository
# Register QueryBus and the handler
builder.with_query_bus()
# The QueryBus instance is available on the builder after with_query_bus()
# GetProductDetailsHandler depends on AsyncSession, ProductRepository, and AuthorizationService
builder.query_bus.register(GetProductDetailsQuery, GetProductDetailsHandler)
container = builder.build()
# 2. Resolve QueryBus
query_bus = container.resolve(QueryBus)
# 3. Create and Execute Query
product_id_to_query = uuid.UUID("00000000-0000-0000-0000-000000000001") # Use a known mockable ID
requesting_user_id = "user-abc-123"
query = GetProductDetailsQuery(product_id=product_id_to_query)
try:
print(f"Dispatching query: {query}")
# Pass subject_id for authorization context
product_details = await query_bus.execute(query, subject_id=requesting_user_id)
if product_details:
print(f"Query executed. Result (product_details): {product_details}")
else:
print(f"Query executed. Product with ID {product_id_to_query} not found.")
except QueryHandlerNotFoundError:
print(f"Error: No handler found for {type(query).__name__}")
except AuthorizationError as e:
print(f"Authorization Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
if __name__ == "__main__":
asyncio.run(main())
This demonstrates the complete flow for queries: defining the query and its handler, registering them with the DI container and QueryBus, and finally executing the query to retrieve data. The authorize method in the handler provides a crucial hook for permission checks before data access.