Testing Applications¶
castlecraft-engineer
provides a suite of testing utilities located in src/castlecraft_engineer/testing/
to help you write effective unit and integration tests for your application components. These utilities aim to simplify test setup, provide useful mocks, and promote best practices.
This guide covers the purpose and basic usage of these testing helpers, primarily demonstrated with pytest
.
1. Testing Aggregates and Repositories¶
testing.aggregate.BaseAggregateTest[T]
¶
- Purpose: A base class to help test
Aggregate
root business logic. It provides fixtures likeaggregate_id
and helper methods such as_create_aggregate
,_load_aggregate_from_history
,assert_event_recorded
, andassert_no_events_recorded
to facilitate testing aggregate state changes and event emissions in isolation from persistence. - Usage: Inherit from this class when testing your aggregate's methods. Define
aggregate_class
in your test subclass.# from your_domain.aggregates import YourAggregate, YourEvent, AnotherEvent # from castlecraft_engineer.testing.aggregate import BaseAggregateTest # import uuid # class TestYourAggregate(BaseAggregateTest[YourAggregate]): # aggregate_class = YourAggregate # def test_create_new_aggregate(self, aggregate_id: uuid.UUID): # data_for_creation = {"name": "Test Aggregate"} # aggregate = self._create_aggregate(aggregate_id, **data_for_creation) # assert aggregate.id == aggregate_id # assert aggregate.name == "Test Aggregate" # self.assert_event_recorded(aggregate, YourEvent, attributes={"name": "Test Aggregate"}) # def test_perform_action_emits_event(self, aggregate_id: uuid.UUID): # aggregate = self._create_aggregate(aggregate_id, name="Initial Name") # self._clear_uncommitted_events(aggregate) # Clear creation event if not relevant # aggregate.perform_some_action(new_value="Updated Value") # assert aggregate.some_property == "Updated Value" # self.assert_event_recorded(aggregate, AnotherEvent, attributes={"changed_value": "Updated Value"})
testing.aggregate.BaseAggregateRepositoryTest[T, M, Repo]
¶
- Purpose: Provides a base class for testing synchronous
AggregateRepository
implementations. It simplifies mocking thesqlmodel.Session
(orsqlalchemy.orm.Session
) and offers helper methods likesetup_get_by_id_mock
,assert_session_add_called
,assert_session_commit_called
, etc., to verify interactions with the session. - Usage: Inherit from this class to test your concrete synchronous aggregate repositories. Define
repository_class
,aggregate_class
, andorm_model_class
. Test methods will typically use themock_session
fixture.# from unittest.mock import MagicMock # import pytest # import uuid # from castlecraft_engineer.testing.aggregate import BaseAggregateRepositoryTest # from castlecraft_engineer.abstractions.aggregate import Aggregate # from castlecraft_engineer.abstractions.repository import AggregateRepository # # --- Define Your Domain Objects (Example) --- # class YourAggregate(Aggregate): # def __init__(self, id: uuid.UUID, name: str = ""): # super().__init__(id) # self.name = name # # ... other aggregate methods # class YourModel: # Your ORM model representation # def __init__(self, id: uuid.UUID, name: str = ""): # self.id = id # self.name = name # class YourRepository(AggregateRepository[YourAggregate, YourModel]): # def __init__(self, aggregate_cls, model_cls): # super().__init__(aggregate_cls, model_cls) # # Implement save, get_by_id, delete_by_id using synchronous session # def save(self, aggregate: YourAggregate, session: MagicMock): # Session type for test # # Example: Convert aggregate to ORM model and save # orm_model = self._to_orm_model(aggregate) # Assume _to_orm_model exists # session.add(orm_model) # session.commit() # session.refresh(orm_model) # return aggregate # Or void # def _to_orm_model(self, aggregate: YourAggregate) -> YourModel: # return YourModel(id=aggregate.id, name=aggregate.name) # --- End Define Your Domain Objects --- # class TestYourSyncRepository(BaseAggregateRepositoryTest[YourAggregate, YourModel, YourRepository]): # repository_class = YourRepository # aggregate_class = YourAggregate # orm_model_class = YourModel # def test_save_aggregate(self, repository_instance: YourRepository, mock_session: MagicMock, sample_aggregate: YourAggregate): # # Arrange: sample_aggregate is provided by a fixture # # Mock the ORM model that would be added and refreshed # # This step depends heavily on your _to_orm_model or mapping logic # expected_orm_model = YourModel(id=sample_aggregate.id, name=sample_aggregate.name) # # Act # repository_instance.save(sample_aggregate, mock_session) # # Assert # # Use specific model instance for assertion if possible, or use mock.ANY for broader checks # self.assert_session_add_called(mock_session, expected_orm_model) # Compares based on __eq__ # self.assert_session_commit_called(mock_session) # self.assert_session_refresh_called(mock_session, expected_orm_model)
testing.aggregate.BaseAsyncAggregateRepositoryTest[T, M, AsyncRepo]
¶
- Purpose: Provides a base class for testing
AsyncAggregateRepository
implementations. It simplifies mocking thesqlalchemy.ext.asyncio.AsyncSession
and offers helper methods likesetup_get_by_id_mock_async
,assert_session_add_called
(for the synchronousadd
part),assert_session_commit_awaited
, etc., to verify asynchronous interactions with the session. - Usage: Inherit from this class to test your concrete asynchronous aggregate repositories. Define
repository_class
,aggregate_class
, andorm_model_class
. Test methods will typically beasync
and use themock_async_session
fixture.# from unittest.mock import AsyncMock # import pytest # import uuid # from castlecraft_engineer.testing.aggregate import BaseAsyncAggregateRepositoryTest # from castlecraft_engineer.abstractions.aggregate import Aggregate # from castlecraft_engineer.abstractions.repository import AsyncAggregateRepository # from sqlalchemy.ext.asyncio import AsyncSession # For type hinting in real repo # # --- Define Your Domain Objects (Example) --- # class YourAggregate(Aggregate): # Same as above # def __init__(self, id: uuid.UUID, name: str = ""): # super().__init__(id) # self.name = name # class YourModel: # Same as above # def __init__(self, id: uuid.UUID, name: str = ""): # self.id = id # self.name = name # class YourAsyncRepository(AsyncAggregateRepository[YourAggregate, YourModel]): # def __init__(self, aggregate_cls, model_cls): # super().__init__(aggregate_cls, model_cls) # # Implement save, get_by_id, delete_by_id using asynchronous session # async def save(self, aggregate: YourAggregate, session: AsyncMock): # AsyncSession for real, AsyncMock for test # orm_model = self._to_orm_model(aggregate) # session.add(orm_model) # add is sync # await session.commit() # await session.refresh(orm_model) # return aggregate # async def get_by_id(self, aggregate_id: uuid.UUID, session: AsyncMock) -> YourAggregate | None: # orm_model = await session.get(self.model_cls, aggregate_id) # if not orm_model: # return None # return self._to_aggregate(orm_model) # Assume _to_aggregate exists # def _to_orm_model(self, aggregate: YourAggregate) -> YourModel: # return YourModel(id=aggregate.id, name=aggregate.name) # def _to_aggregate(self, model: YourModel) -> YourAggregate: # return YourAggregate(id=model.id, name=model.name) # --- End Define Your Domain Objects --- # class TestYourAsyncRepository(BaseAsyncAggregateRepositoryTest[YourAggregate, YourModel, YourAsyncRepository]): # repository_class = YourAsyncRepository # aggregate_class = YourAggregate # orm_model_class = YourModel # @pytest.mark.asyncio # async def test_save_and_get_aggregate(self, repository_instance: YourAsyncRepository, mock_async_session: AsyncMock, sample_aggregate: YourAggregate): # # Arrange for save # expected_orm_model_on_save = YourModel(id=sample_aggregate.id, name=sample_aggregate.name) # # Act: Save # await repository_instance.save(sample_aggregate, mock_async_session) # # Assert: Save # self.assert_session_add_called(mock_async_session, expected_orm_model_on_save) # self.assert_session_commit_awaited(mock_async_session) # self.assert_session_refresh_awaited(mock_async_session, expected_orm_model_on_save) # # Arrange for get: Reset mocks if necessary or use different session mock for clarity # mock_async_session.reset_mock() # Reset calls from save operation # # Mock the ORM model that would be returned by session.get # mock_orm_instance_for_get = YourModel(id=sample_aggregate.id, name=sample_aggregate.name) # self.setup_get_by_id_mock_async(mock_async_session, mock_orm_instance_for_get) # # Act: Get # fetched_aggregate = await repository_instance.get_by_id(sample_aggregate.id, mock_async_session) # # Assert: Get # mock_async_session.get.assert_awaited_once_with(self.orm_model_class, sample_aggregate.id) # assert fetched_aggregate is not None # assert fetched_aggregate.id == sample_aggregate.id # assert fetched_aggregate.name == sample_aggregate.name
testing.repository.BaseModelRepositoryTest[TSQLModel, SyncRepo]
¶
- Purpose: Provides a base class for testing synchronous
ModelRepository
implementations (fromcastlecraft_engineer.database.repository
). It simplifies mocking thesqlalchemy.orm.Session
and provides helper methods for common session operations likeget
,add
,commit
,refresh
, anddelete
. - Usage: Inherit from this class. Define
repository_class
(yourModelRepository
subclass) andmodel_class
(yourSQLModel
or ORM model). Use fixtures likemock_session
,sample_model_id
,sample_model_instance
, andrepository_instance
. Helper methods includesetup_session_get_mock
,setup_session_execute_mock
, and assertion helpers for session calls.# from unittest.mock import MagicMock # import pytest # from typing import Optional # from sqlmodel import SQLModel, Field # from castlecraft_engineer.testing.repository import BaseModelRepositoryTest # from castlecraft_engineer.database.repository import ModelRepository # from sqlalchemy.orm import Session # For type hinting in real repo # # --- Define Your Domain Objects (Example) --- # class Item(SQLModel, table=True): # id: Optional[int] = Field(default=None, primary_key=True) # name: str # class ItemRepository(ModelRepository[Item]): # # Your custom repository methods can go here if needed # pass # --- End Define Your Domain Objects --- # class TestItemRepository(BaseModelRepositoryTest[Item, ItemRepository]): # repository_class = ItemRepository # model_class = Item # def test_create_item(self, repository_instance: ItemRepository, mock_session: MagicMock, sample_model_instance: Item): # # Act # created_item = repository_instance.create(mock_session, sample_model_instance) # # Assert # self.assert_session_add_called(mock_session, sample_model_instance) # self.assert_session_commit_called(mock_session) # self.assert_session_refresh_called(mock_session, sample_model_instance) # assert created_item == sample_model_instance # def test_get_item_by_id_found(self, repository_instance: ItemRepository, mock_session: MagicMock, sample_model_instance: Item): # # Arrange # # sample_model_id is available if needed, but sample_model_instance.id can also be used # item_id = sample_model_instance.id # self.setup_session_get_mock(mock_session, return_value=sample_model_instance) # # Act # found_item = repository_instance.get_by_id(mock_session, item_id) # # Assert # mock_session.get.assert_called_once_with(self.model_class, item_id) # assert found_item == sample_model_instance # def test_get_all_items(self, repository_instance: ItemRepository, mock_session: MagicMock, sample_model_instance: Item): # # Arrange # items_to_return = [sample_model_instance] # self.setup_session_execute_mock(mock_session, return_value=items_to_return) # # Act # all_items = repository_instance.get_all(mock_session) # # Assert # mock_session.execute.assert_called_once() # Further assertion on statement if needed # assert all_items == items_to_return
testing.repository.BaseAsyncModelRepositoryTest[TSQLModel, AsyncRepo]
¶
- Purpose: Provides a base class for testing
AsyncModelRepository
implementations (fromcastlecraft_engineer.database.repository
). It simplifies mocking thesqlalchemy.ext.asyncio.AsyncSession
and provides helper methods for common asynchronous session operations. - Usage: Inherit from this class. Define
repository_class
(yourAsyncModelRepository
subclass) andmodel_class
(yourSQLModel
or ORM model). Use fixtures likemock_async_session
,sample_model_id
,sample_model_instance
, andrepository_instance
. Test methods will typically beasync
. Helper methods includesetup_session_get_mock_async
,setup_session_execute_mock_async
, and assertion helpers for awaited session calls.# from unittest.mock import AsyncMock # import pytest # from typing import Optional # from sqlmodel import SQLModel, Field # from castlecraft_engineer.testing.repository import BaseAsyncModelRepositoryTest # from castlecraft_engineer.database.repository import AsyncModelRepository # from sqlalchemy.ext.asyncio import AsyncSession # For type hinting in real repo # # --- Define Your Domain Objects (Example) --- # class Product(SQLModel, table=True): # id: Optional[int] = Field(default=None, primary_key=True) # name: str # class ProductAsyncRepository(AsyncModelRepository[Product]): # # Your custom repository methods can go here if needed # pass # --- End Define Your Domain Objects --- # class TestProductAsyncRepository(BaseAsyncModelRepositoryTest[Product, ProductAsyncRepository]): # repository_class = ProductAsyncRepository # model_class = Product # @pytest.mark.asyncio # async def test_create_product(self, repository_instance: ProductAsyncRepository, mock_async_session: AsyncMock, sample_model_instance: Product): # # Act # created_product = await repository_instance.create(mock_async_session, sample_model_instance) # # Assert # self.assert_session_add_called(mock_async_session, sample_model_instance) # add is sync # self.assert_session_commit_awaited(mock_async_session) # self.assert_session_refresh_awaited(mock_async_session, sample_model_instance) # assert created_product == sample_model_instance # @pytest.mark.asyncio # async def test_get_product_by_id_found(self, repository_instance: ProductAsyncRepository, mock_async_session: AsyncMock, sample_model_instance: Product): # # Arrange # product_id = sample_model_instance.id # self.setup_session_get_mock_async(mock_async_session, return_value=sample_model_instance) # # Act # found_product = await repository_instance.get_by_id(mock_async_session, product_id) # # Assert # mock_async_session.get.assert_awaited_once_with(self.model_class, product_id) # assert found_product == sample_model_instance # @pytest.mark.asyncio # async def test_get_all_products(self, repository_instance: ProductAsyncRepository, mock_async_session: AsyncMock, sample_model_instance: Product): # # Arrange # products_to_return = [sample_model_instance] # self.setup_session_execute_mock_async(mock_async_session, return_value=products_to_return) # # Act # all_products = await repository_instance.get_all(mock_async_session) # # Assert # mock_async_session.execute.assert_awaited_once() # Further assertion on statement if needed # assert all_products == products_to_return
2. Testing Command Bus (testing.command_bus
)¶
command_bus_instance
(pytest fixture)¶
- Purpose: Provides a fully initialized
CommandBus
instance with a cleanpunq.Container
for each test. This fixture usesclean_container_for_bus_tests
to ensure test isolation. - Related Dummies:
DummyCommand
,DummyCommandHandler
,AnotherDummyCommand
,AnotherDummyCommandHandler
are provided to test the bus's registration and dispatch mechanisms without needing real application commands/handlers. - Usage: Use this fixture to test if the command bus correctly dispatches commands to registered handlers.
import pytest from unittest.mock import AsyncMock from castlecraft_engineer.abstractions.command_bus import CommandBus from castlecraft_engineer.testing.command_bus import ( command_bus_instance, # pytest fixture DummyCommand, DummyCommandHandler, clean_container_for_bus_tests # Also a fixture, used by command_bus_instance ) @pytest.mark.asyncio async def test_command_bus_dispatches_to_handler(command_bus_instance: CommandBus, mocker): # DummyCommandHandler.execute is an AsyncMock by default if not overridden # For this test, let's ensure it's an AsyncMock to check calls mock_execute = AsyncMock(return_value="dummy_result") mocker.patch.object(DummyCommandHandler, "execute", mock_execute) # Register the handler (command_bus_instance has a DI container) command_bus_instance.container.register(DummyCommandHandler) # Register the handler type command_bus_instance.register_handler(DummyCommand, DummyCommandHandler) cmd = DummyCommand(data="test_data") result = await command_bus_instance.execute(cmd) mock_execute.assert_awaited_once_with(cmd) assert result == "dummy_result"
3. Testing Command Handlers (testing.command_handler
)¶
BaseCommandHandlerTest
¶
- Purpose: A base class designed to simplify testing of individual
CommandHandler
implementations. It often pre-initializes common mocks for dependencies likeAsyncSession
, an aggregate repository, and an event publisher. - Usage: Inherit from this class. Your test methods can then focus on the handler's
execute
andauthorize
logic, using the provided mocks to control dependency behavior and make assertions.# from unittest.mock import AsyncMock # import pytest # from castlecraft_engineer.testing.command_handler import BaseCommandHandlerTest # from your_app.commands import YourActualCommand # from your_app.handlers import YourActualCommandHandler # from your_app.models import YourAggregate # from your_app.repositories import YourAggregateRepository # from castlecraft_engineer.abstractions.event_bus import EventBus # class TestYourActualCommandHandler(BaseCommandHandlerTest): # @pytest.mark.asyncio # async def test_execute_creates_and_saves_aggregate(self): # # self.mock_session, self.mock_repo, self.mock_event_bus are available from BaseCommandHandlerTest # # self.mock_auth_service is also available # # Configure mock_repo.save to do nothing or return a value if needed # self.mock_repo.save = AsyncMock() # self.mock_event_bus.publish_batch = AsyncMock() # handler = YourActualCommandHandler( # session=self.mock_session, # repo=self.mock_repo, # event_bus=self.mock_event_bus, # auth_service=self.mock_auth_service # ) # command = YourActualCommand(name="Test Product") # await handler.authorize(command) # Assuming it passes or mock auth_service # result_id = await handler.execute(command) # self.mock_repo.save.assert_awaited_once() # # Assert that the first argument to save was an instance of YourAggregate # saved_aggregate = self.mock_repo.save.call_args[0][0] # assert isinstance(saved_aggregate, YourAggregate) # assert saved_aggregate.name == "Test Product" # self.mock_event_bus.publish_batch.assert_awaited_once() # assert result_id is not None
4. Testing Event Bus (testing.event_bus
)¶
EventBusTestHelper
¶
- Purpose: A helper class to facilitate testing interactions with the
EventBus
. It allows for easy registration of mock event handlers and assertions on published events. - Usage: Instantiate the helper, subscribe mock handlers, publish events through the bus, and then assert that the mock handlers were called with the correct event.
import pytest from unittest.mock import AsyncMock from castlecraft_engineer.abstractions.event import Event from castlecraft_engineer.abstractions.event_bus import EventBus from castlecraft_engineer.testing.event_bus import EventBusTestHelper class MyTestEvent(Event): data: str @pytest.mark.asyncio async def test_event_bus_with_helper(): helper = EventBusTestHelper() event_bus = helper.event_bus # Get the EventBus instance from the helper mock_handler_one = AsyncMock() helper.subscribe_mock_handler(MyTestEvent, mock_handler_one) test_event = MyTestEvent(data="event_payload") await event_bus.publish(test_event) mock_handler_one.assert_awaited_once_with(test_event)
5. Testing Event Stream Consumers (testing.event_consumer
)¶
BaseEventStreamConsumerTest
¶
- Purpose: Provides a base testing class for
EventStreamConsumer
implementations. It might offer mocks for external event sources or the internalEventBus
that the consumer publishes to. - Usage: Inherit from this class to test your consumer's logic for receiving, transforming, and dispatching events.
# import pytest # from unittest.mock import AsyncMock # from castlecraft_engineer.testing.event_consumer import BaseEventStreamConsumerTest # from your_app.consumers import YourEventConsumer # Your concrete consumer # from your_app.events import ExternalEventType, InternalEventType # class TestYourEventConsumer(BaseEventStreamConsumerTest): # @pytest.mark.asyncio # async def test_process_message_publishes_to_internal_bus(self): # self.mock_event_bus is available from BaseEventStreamConsumerTest # consumer = YourEventConsumer(event_bus=self.mock_event_bus) # raw_external_message = {"id": "123", "payload": "data"} # Example external message # # Simulate the consumer's internal message processing logic # # This depends on how YourEventConsumer is structured # # For example, if it has a method _process_raw_message: # # await consumer._process_raw_message(raw_external_message) # # Or, if testing a higher-level method that calls the bus: # # await consumer.handle_incoming_message(raw_external_message) # # Assert that the internal event bus's publish method was called with the correct InternalEventType # self.mock_event_bus.publish.assert_awaited_once() # published_event = self.mock_event_bus.publish.call_args[0][0] # assert isinstance(published_event, InternalEventType) # assert published_event.original_id == "123"
6. Testing Event Handlers (testing.event_handler
)¶
BaseEventHandlerTest
¶
- Purpose: A base class for testing individual
EventHandler
implementations. It might provide common setup or mocks for dependencies the handler might have (e.g., services, repositories). - Usage: Inherit from this class. Your test methods should focus on the handler's
handle
logic, verifying that it performs the correct actions based on the received event.# import pytest # from unittest.mock import AsyncMock # from castlecraft_engineer.testing.event_handler import BaseEventHandlerTest # from your_app.events import YourAppEvent # from your_app.handlers import YourAppEventHandler # Your concrete event handler # from your_app.services import NotificationService # Example dependency # class TestYourAppEventHandler(BaseEventHandlerTest): # @pytest.mark.asyncio # async def test_handle_sends_notification(self): # mock_notification_service = AsyncMock(spec=NotificationService) # handler = YourAppEventHandler(notification_service=mock_notification_service) # event_data = YourAppEvent(user_id="user1", message="Hello!") # await handler.handle(event_data) # mock_notification_service.send_user_notification.assert_awaited_once_with( # user_id="user1", content="Hello!" # )
7. Mocking External Event Publishers (testing.mock_event_publisher
)¶
MockExternalEventPublisher
(async) and MockExternalEventPublisherSync
(sync)¶
- Purpose: These are mock implementations of the
ExternalEventPublisher
abstract class. They allow you to test components that publish events to external systems (like Kafka, RabbitMQ) without making actual network calls. They record published events in apublished_events
list for later assertion. - Usage: Inject the appropriate mock publisher into the component under test. After the component performs an action that should publish an event, assert the content and number of events in
mock_publisher.published_events
.import pytest from castlecraft_engineer.testing.mock_event_publisher import MockExternalEventPublisher from castlecraft_engineer.abstractions.event import Event class MyEvent(Event): content: str class MyServiceThatPublishesExternally: def __init__(self, publisher: MockExternalEventPublisher): # Type hint to ExternalEventPublisher in real code self.publisher = publisher async def do_work_and_publish(self, data: str): event_to_publish = MyEvent(content=data) await self.publisher.publish(event_to_publish) @pytest.mark.asyncio async def test_service_publishes_event_externally(): mock_publisher = MockExternalEventPublisher() service = MyServiceThatPublishesExternally(publisher=mock_publisher) await service.do_work_and_publish("external_payload") assert len(mock_publisher.published_events) == 1 published_event = mock_publisher.published_events[0] assert isinstance(published_event, MyEvent) assert published_event.content == "external_payload"
8. Testing Query Bus (testing.query_bus
)¶
query_bus_helper
(pytest fixture)¶
- Purpose: Similar to
command_bus_instance
, this fixture provides aQueryBus
instance with a cleanpunq.Container
for isolated testing of query dispatch and handler registration. - Usage: Use this fixture to test if the query bus correctly dispatches queries to registered query handlers and returns their results.
import pytest from unittest.mock import AsyncMock from castlecraft_engineer.abstractions.query_bus import QueryBus from castlecraft_engineer.testing.query_bus import ( query_bus_helper, # pytest fixture DummyQuery, DummyQueryHandler ) @pytest.mark.asyncio async def test_query_bus_dispatches_to_handler(query_bus_helper: QueryBus, mocker): # DummyQueryHandler.execute is an AsyncMock by default expected_result = {"data": "query_result"} mock_execute = AsyncMock(return_value=expected_result) mocker.patch.object(DummyQueryHandler, "execute", mock_execute) query_bus_helper.container.register(DummyQueryHandler) query_bus_helper.register_handler(DummyQuery, DummyQueryHandler) query = DummyQuery(param="search_term") result = await query_bus_helper.execute(query) mock_execute.assert_awaited_once_with(query) assert result == expected_result
9. Testing Query Handlers (testing.query_handler
)¶
BaseQueryHandlerTest
¶
- Purpose: A base class for testing individual
QueryHandler
implementations. It typically provides common mocks, such as anAsyncSession
, which query handlers often need to fetch data. - Usage: Inherit from this class. Your tests should focus on the handler's
execute
andauthorize
logic, verifying that it fetches and returns the correct data based on the query parameters and that authorization checks are performed.# from unittest.mock import AsyncMock # import pytest # from castlecraft_engineer.testing.query_handler import BaseQueryHandlerTest # from your_app.queries import YourActualQuery # from your_app.handlers import YourActualQueryHandler # from your_app.dto import YourQueryResultDTO # from your_app.repositories import YourReadModelRepository # Example dependency # class TestYourActualQueryHandler(BaseQueryHandlerTest): # @pytest.mark.asyncio # async def test_execute_returns_correct_data(self): # self.mock_session and self.mock_auth_service are available # mock_read_repo = AsyncMock(spec=YourReadModelRepository) # expected_data = YourQueryResultDTO(id="123", name="Test Data") # mock_read_repo.fetch_data_by_query_params = AsyncMock(return_value=expected_data) # handler = YourActualQueryHandler( # session=self.mock_session, # read_repo=mock_read_repo, # auth_service=self.mock_auth_service # ) # query = YourActualQuery(filter_param="test") # await handler.authorize(query) # Assuming it passes or mock auth_service # result = await handler.execute(query) # mock_read_repo.fetch_data_by_query_params.assert_awaited_once_with(self.mock_session, "test") # assert result == expected_data
10. Mocking Authorization (testing.authorization
)¶
MockAuthorizationService
¶
- Purpose: A mock implementation of the
AuthorizationService
abstract base class. It allows you to easily control the outcome of permission checks in your tests without needing a full-fledged authorization setup. - Usage: Inject
MockAuthorizationService
into your handlers or services under test. You can configure it to allow all permissions, deny all, or mock specific permission checks using standard mocking techniques (e.g.,mocker.patch.object
on itscheck_permission
method). TheBaseCommandHandlerTest
andBaseQueryHandlerTest
often provide an instance asself.mock_auth_service
.import pytest from unittest.mock import AsyncMock from castlecraft_engineer.testing.authorization import MockAuthorizationService from castlecraft_engineer.authorization.permission import Permission from castlecraft_engineer.authorization.types import Action, Resource, Scope from castlecraft_engineer.exc import AuthorizationError CAN_DO_THING = Permission(action=Action.CREATE, resource=Resource("THING"), scope=Scope.ANY) class ServiceWithAuth: def __init__(self, auth_service: MockAuthorizationService): # Type hint to AuthorizationService in real code self.auth_service = auth_service async def do_something_requiring_permission(self): await self.auth_service.check_permission(required_permissions=[CAN_DO_THING]) return "Thing done!" @pytest.mark.asyncio async def test_service_with_auth_allowed(mocker): mock_auth_service = MockAuthorizationService() # Default MockAuthorizationService allows all, or you can be explicit: mocker.patch.object(mock_auth_service, 'check_permission', AsyncMock()) service = ServiceWithAuth(auth_service=mock_auth_service) result = await service.do_something_requiring_permission() assert result == "Thing done!" mock_auth_service.check_permission.assert_awaited_once_with(required_permissions=[CAN_DO_THING]) @pytest.mark.asyncio async def test_service_with_auth_denied(mocker): mock_auth_service = MockAuthorizationService() mocker.patch.object(mock_auth_service, 'check_permission', AsyncMock(side_effect=AuthorizationError("Permission denied"))) service = ServiceWithAuth(auth_service=mock_auth_service) with pytest.raises(AuthorizationError, match="Permission denied"): await service.do_something_requiring_permission() mock_auth_service.check_permission.assert_awaited_once_with(required_permissions=[CAN_DO_THING])
By leveraging these testing utilities, you can write more focused, maintainable, and reliable tests for your castlecraft-engineer
based applications.
11. Testing Event-Sourced Components with InMemoryEventStore
¶
For testing aggregates, repositories, or command handlers that interact with an EventStore
, Castlecraft Engineer provides InMemoryEventStore
located in castlecraft_engineer.testing.event_store
.
This in-memory implementation of the EventStore
abstraction is perfect for creating fast, isolated unit and integration tests without needing an external database or event store system.
Key Features:¶
- Full
EventStore
Implementation: Behaves like a real event store for appending, loading, and version checking. - Optimistic Concurrency: Correctly handles
expected_version
and raisesEventStoreConflictError
on mismatches. - Test Helpers:
clear()
: Asynchronously clears all stored streams and versions, useful for test isolation (e.g., inpytest
fixtures).get_stream(aggregate_id)
: Asynchronously returns a copy of the event stream for a specific aggregate, useful for direct assertions on stored events.
Example Usage:¶
Here's a basic example of how you might use InMemoryEventStore
in a pytest
test:
import pytest
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List
from castlecraft_engineer.abstractions.event import Event
from castlecraft_engineer.testing import InMemoryEventStore
# Define a sample event for testing purposes
@dataclass(frozen=True)
class ItemCreated(Event):
item_id: uuid.UUID
name: str
event_version: int = 1
occurred_on: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
@classmethod
def event_type_name(cls) -> str:
return "item.created"
@pytest.fixture
async def event_store() -> InMemoryEventStore[uuid.UUID]:
store = InMemoryEventStore[uuid.UUID]()
yield store
await store.clear() # Clean up after test
@pytest.mark.asyncio
async def test_item_creation_events(event_store: InMemoryEventStore[uuid.UUID]):
item_id = uuid.uuid4()
create_event = ItemCreated(item_id=item_id, name="Test Item")
# Append event for a new aggregate (expected_version = -1)
await event_store.append_events(item_id, -1, [create_event])
# Verify current version
version = await event_store.get_current_version(item_id)
assert version == 0 # Version is 0-indexed (sequence of last event)
# Load events and verify
loaded_events = await event_store.load_events(item_id)
assert len(loaded_events) == 1
assert loaded_events[0] == create_event
# Use get_stream for direct inspection if needed
stream_copy = await event_store.get_stream(item_id)
assert stream_copy == [create_event]
By using InMemoryEventStore
, you can thoroughly test the event-sourcing aspects of your application components in an isolated and efficient manner.