Queries, Query Handlers, and Query Bus¶
In a CQRS (Command Query Responsibility Segregation) architecture, Queries are responsible for retrieving data from the system without causing any state changes. castlecraft-engineer
provides a structured approach to define, handle, and dispatch these queries.
1. The Query
Base Class¶
All queries in your application should inherit from the Query
base class, located in castlecraft_engineer.abstractions.query.Query
.
- Purpose: A
Query
is a simple data structure (often a dataclass) that encapsulates all the parameters required to fetch specific data. It's a message that expresses a request for information. - Characteristics:
- Typically named to reflect the data being requested (e.g.,
GetUserByIdQuery
,ListActiveProductsQuery
). - Should be immutable once created (e.g., using
@dataclass(frozen=True)
). - Carry data (parameters for the query) but no behavior.
- Crucially, executing a query should never modify system state.
- The base
Query
class can be generic,Query[TResult]
, allowing you to specify the expected result type of the query.
- Typically named to reflect the data being requested (e.g.,
# Example: Defining a Query
from dataclasses import dataclass
import uuid
from typing import Optional, Dict, Any # For TResult type hint
from castlecraft_engineer.abstractions.query import Query
@dataclass(frozen=True)
# If Query base class is not generic, or if result type is handled by handler only:
class GetProductDetailsQuery(Query): # No TResult here if base Query isn't generic
product_id: uuid.UUID
2. The QueryHandler
Base Class¶
For every query, there should be a corresponding QueryHandler
. Query Handlers contain the logic to process a query and return the requested data. They inherit from castlecraft_engineer.abstractions.query_handler.QueryHandler[TQuery, TResult]
.
- Purpose: To encapsulate the logic required to execute a specific query. This often involves interacting with repositories or other data sources.
- Key Methods:
execute(self, query: TQuery, **kwargs) -> TResult
(orasync execute
): This is the main method where the query's logic is implemented. It receives the query object and returns the fetched data (TResult
). The**kwargs
can receive context likesubject_id
orpermissions
from the bus.authorize(self, query: TQuery, subject_id: Optional[str] = None, **kwargs) -> bool
(orasync authorize
): This method is called by theQueryBus
beforeexecute
. It's responsible for checking if the current context (e.g., user, permissions) is authorized to execute the given query and access the requested data. If authorization fails, it should raise anAuthorizationError
. It should returnTrue
if authorized.
# Example: Defining a QueryHandler
import uuid # Ensure uuid is imported for ProductModel example
from typing import Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession # Assuming async operation
from sqlmodel import SQLModel, Field # For dummy ProductModel
from castlecraft_engineer.abstractions.query_handler import QueryHandler
from castlecraft_engineer.database.repository import AsyncModelRepository # Example for fetching data
from castlecraft_engineer.authorization.base_service import AuthorizationService # Example
from castlecraft_engineer.exc import AuthorizationError # Standard error
# Assuming GetProductDetailsQuery is defined as above
# Dummy ProductModel for the example
class ProductModel(SQLModel, table=True):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
name: str
description: Optional[str] = None
price: float
# Dummy ProductRepository for the example
class ProductRepository(AsyncModelRepository[ProductModel]): # TModel is ProductModel
def __init__(self):
super().__init__(model_cls=ProductModel)
# Assuming QueryHandler in your library is Generic[TQuery] and execute returns Any
# The actual result type (Optional[Dict[str, Any]]) is then specified on the execute method.
class GetProductDetailsHandler(QueryHandler[GetProductDetailsQuery]):
def __init__(self, session: AsyncSession, product_repo: ProductRepository, auth_service: AuthorizationService):
self.session = session
self.product_repo = product_repo
self.auth_service = auth_service
async def authorize(self, query: GetProductDetailsQuery, subject_id: Optional[str] = None, **kwargs) -> bool:
# Example: Check if the current context can view this product.
# This is a simplified check. Real checks might involve user roles, resource ownership etc.
# await self.auth_service.check_permission(subject_id=subject_id, required_permissions=[...])
print(f"Authorizing GetProductDetailsQuery for product_id: {query.product_id} by subject: {subject_id}")
# If not authorized: raise AuthorizationError("Not authorized to view this product.")
return True # For this example, always allow
async def execute(self, query: GetProductDetailsQuery, **kwargs) -> Optional[Dict[str, Any]]:
print(f"Executing GetProductDetailsQuery for product_id: {query.product_id}")
# subject_id = kwargs.get('subject_id')
# permissions = kwargs.get('permissions')
product_model = await self.product_repo.get_by_id(self.session, query.product_id)
if product_model:
return {
"id": str(product_model.id), # Ensure UUID is serialized if needed
"name": product_model.name,
"description": product_model.description,
"price": product_model.price,
}
return None
3. The QueryBus
¶
The QueryBus
(from castlecraft_engineer.abstractions.query_bus.QueryBus
) acts as a central dispatcher for queries. It decouples the part of your application that requests data (e.g., an API endpoint) from the handler that retrieves it.
- Purpose: To route queries to their appropriate handlers.
- Key Functionalities:
- Handler Registration: Query Handlers are registered with the
QueryBus
for specificQuery
types. This is often done at application startup, typically using a Dependency Injection (DI) container.castlecraft-engineer
'sQueryBus
(when used withContainerBuilder
fromcommon/di.py
) allows registration viabuilder.query_bus.register(QueryType, QueryHandlerType)
.
- Handler Resolution: When a query is dispatched, the bus looks up the registered handler for that query's type.
castlecraft-engineer
'sQueryBus
resolves handlers from apunq.Container
(the DI container). - Execution Flow: When
query_bus.execute(query, **kwargs)
is called (wherekwargs
can includesubject_id
,permissions
for authorization):- The bus resolves the handler for the type of
query
. - It calls the handler's
authorize(query, **kwargs)
method (passing through relevant kwargs). - If authorization succeeds (returns
True
), it calls the handler'sexecute(query, **kwargs)
method (passing through relevant kwargs). - It returns the result from the handler's
execute
method.
- The bus resolves the handler for the type of
- Handler Registration: Query Handlers are registered with the
4. QueryHandlerNotFoundError
Exception¶
If the QueryBus
receives a query for which no handler has been registered, it will raise a QueryHandlerNotFoundError
. This exception is defined in castlecraft_engineer.abstractions.query_bus
and re-exported in castlecraft_engineer.exc
.
It's crucial to ensure all your queries have corresponding handlers registered with the bus.
5. Code Example: Wiring Up and Executing¶
This example shows how to set up the DI container, register the handler, and execute a query via the bus.
import asyncio
import uuid
from typing import Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession # For type hinting
from sqlmodel import SQLModel, Field # For dummy ProductModel
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.query_bus import QueryBus
from castlecraft_engineer.abstractions.query import Query # For GetProductDetailsQuery base
from castlecraft_engineer.abstractions.query_handler import QueryHandler # For GetProductDetailsHandler base
from castlecraft_engineer.database.repository import AsyncModelRepository # For ProductRepository base
from castlecraft_engineer.authorization.default_services import AllowAllAuthorizationService # For example
from castlecraft_engineer.authorization.base_service import AuthorizationService
from castlecraft_engineer.exc import QueryHandlerNotFoundError, AuthorizationError
# --- Definitions from above examples (for self-contained script) ---
@dataclass(frozen=True) # Assuming Query base class is not generic for TResult
class GetProductDetailsQuery(Query):
product_id: uuid.UUID
class ProductModel(SQLModel, table=True):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
name: str
description: Optional[str] = None
price: float
class ProductRepository(AsyncModelRepository[ProductModel]):
def __init__(self):
super().__init__(model_cls=ProductModel)
# Corrected QueryHandler signature
class GetProductDetailsHandler(QueryHandler[GetProductDetailsQuery]):
def __init__(self, session: AsyncSession, product_repo: ProductRepository, auth_service: AuthorizationService):
self.session = session
self.product_repo = product_repo
self.auth_service = auth_service
async def authorize(self, query: GetProductDetailsQuery, subject_id: Optional[str] = None, **kwargs) -> bool:
print(f"Authorizing GetProductDetailsQuery for product_id: {query.product_id} by subject: {subject_id}")
return True
async def execute(self, query: GetProductDetailsQuery, **kwargs) -> Optional[Dict[str, Any]]:
print(f"Executing GetProductDetailsQuery for product_id: {query.product_id}")
# Example: Simulate finding a product (replace with actual repo call if it were real)
# For this mock setup, we'll assume it might not find it to test the None path.
# product_model = await self.product_repo.get_by_id(self.session, query.product_id)
# if product_model:
# return { ... }
if str(query.product_id) == "00000000-0000-0000-0000-000000000001": # Mock finding one product
return {"id": str(query.product_id), "name": "Mock Product", "price": 10.99}
return None
# --- End Definitions ---
async def main():
# 1. Configure DI Container
builder = ContainerBuilder()
# Register a dummy AsyncSession for this example if not using a real DB
class MockAsyncSession:
# Add methods that might be called by the repository if it were real
# For get_by_id, session.get might be called.
async def get(self, model_cls, ident): # Simulate session.get
print(f"MockSession.get called for {model_cls} with id {ident}")
if str(ident) == "00000000-0000-0000-0000-000000000001": # Match ID used in handler's mock logic
return ProductModel(id=ident, name="Mock Product", price=10.99)
return None
async def commit(self): print("MockSession: commit called")
async def rollback(self): print("MockSession: rollback called")
async def close(self): print("MockSession: close called")
async def __aenter__(self): return self # For 'async with' if used by repo/handler
async def __aexit__(self, exc_type, exc, tb): await self.close()
builder.register(AuthorizationService, AllowAllAuthorizationService) # Example auth service
builder.register(AsyncSession, instance=MockAsyncSession()) # Register an instance of the mock
builder.register(ProductRepository) # Register the repository
# Register QueryBus and the handler
builder.with_query_bus()
# The QueryBus instance is available on the builder after with_query_bus()
# GetProductDetailsHandler depends on AsyncSession, ProductRepository, and AuthorizationService
builder.query_bus.register(GetProductDetailsQuery, GetProductDetailsHandler)
container = builder.build()
# 2. Resolve QueryBus
query_bus = container.resolve(QueryBus)
# 3. Create and Execute Query
product_id_to_query = uuid.UUID("00000000-0000-0000-0000-000000000001") # Use a known mockable ID
requesting_user_id = "user-abc-123"
query = GetProductDetailsQuery(product_id=product_id_to_query)
try:
print(f"Dispatching query: {query}")
# Pass subject_id for authorization context
product_details = await query_bus.execute(query, subject_id=requesting_user_id)
if product_details:
print(f"Query executed. Result (product_details): {product_details}")
else:
print(f"Query executed. Product with ID {product_id_to_query} not found.")
except QueryHandlerNotFoundError:
print(f"Error: No handler found for {type(query).__name__}")
except AuthorizationError as e:
print(f"Authorization Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
if __name__ == "__main__":
asyncio.run(main())
This demonstrates the complete flow for queries: defining the query and its handler, registering them with the DI container and QueryBus
, and finally executing the query to retrieve data. The authorize
method in the handler provides a crucial hook for permission checks before data access.