Testing Applications¶
castlecraft-engineer provides a suite of testing utilities located in src/castlecraft_engineer/testing/ to help you write effective unit and integration tests for your application components. These utilities aim to simplify test setup, provide useful mocks, and promote best practices.
This guide covers the purpose and basic usage of these testing helpers, primarily demonstrated with pytest.
1. Testing Aggregates and Repositories¶
testing.aggregate.BaseAggregateTest¶
- Purpose: While not explicitly defined in the provided context as a base class with specific methods, the pattern suggests that if you have common setup or assertion logic for testing your aggregate's business rules (methods that don't involve persistence), you could create such a base class yourself or test aggregates directly.
- Usage: Focus on testing the aggregate's internal state changes and event recording in isolation.
# Example: Testing Aggregate Logic (Conceptual) # from your_domain.models import YourAggregate, YourEvent # def test_your_aggregate_action(): # aggregate = YourAggregate.create_new(...) # aggregate.perform_action(...) # assert aggregate.status == ExpectedStatus # assert any(isinstance(event, YourEvent) for event in aggregate.uncommitted_events)
testing.aggregate.BaseAggregateRepositoryTest[TID, TAggregate, TModel]¶
- Purpose: Provides a base class for testing
AsyncAggregateRepositoryimplementations. It often helps with setting up an in-memory database (like SQLite) for tests, providing a session, and offering helper methods for repository operations. - Usage: Inherit from this class to test your concrete aggregate repositories. Focus on
save,get_by_id,delete, and optimistic concurrency behavior.(Note: The exact structure of# from sqlalchemy.ext.asyncio import AsyncSession # import pytest # from castlecraft_engineer.testing.aggregate import BaseAggregateRepositoryTest # from your_domain.models import YourAggregate, YourModel # from your_domain.repositories import YourRepository # class TestYourRepository(BaseAggregateRepositoryTest[uuid.UUID, YourAggregate, YourModel]): # repository_class = YourRepository # aggregate_class = YourAggregate # For creating test instances # model_class = YourModel # For schema creation # @pytest.mark.asyncio # async def test_save_and_get_aggregate(self, async_session: AsyncSession): # repo = self.repository_class() # aggregate_instance = self.aggregate_class.create_new(name="Test Agg") # await repo.save(aggregate_instance, async_session) # await async_session.commit() # fetched_aggregate = await repo.get_by_id(aggregate_instance.id, async_session) # assert fetched_aggregate is not None # assert fetched_aggregate.id == aggregate_instance.id # assert fetched_aggregate.name == "Test Agg"BaseAggregateRepositoryTestmight require specific setup like providing the model for schema creation. Refer to its implementation for details.)
2. Testing Command Bus (testing.command_bus)¶
command_bus_instance (pytest fixture)¶
- Purpose: Provides a fully initialized
CommandBusinstance with a cleanpunq.Containerfor each test. This fixture usesclean_container_for_bus_teststo ensure test isolation. - Related Dummies:
DummyCommand,DummyCommandHandler,AnotherDummyCommand,AnotherDummyCommandHandlerare provided to test the bus's registration and dispatch mechanisms without needing real application commands/handlers. - Usage: Use this fixture to test if the command bus correctly dispatches commands to registered handlers.
import pytest from unittest.mock import AsyncMock from castlecraft_engineer.abstractions.command_bus import CommandBus from castlecraft_engineer.testing.command_bus import ( command_bus_instance, # pytest fixture DummyCommand, DummyCommandHandler, clean_container_for_bus_tests # Also a fixture, used by command_bus_instance ) @pytest.mark.asyncio async def test_command_bus_dispatches_to_handler(command_bus_instance: CommandBus, mocker): # DummyCommandHandler.execute is an AsyncMock by default if not overridden # For this test, let's ensure it's an AsyncMock to check calls mock_execute = AsyncMock(return_value="dummy_result") mocker.patch.object(DummyCommandHandler, "execute", mock_execute) # Register the handler (command_bus_instance has a DI container) command_bus_instance.container.register(DummyCommandHandler) # Register the handler type command_bus_instance.register_handler(DummyCommand, DummyCommandHandler) cmd = DummyCommand(data="test_data") result = await command_bus_instance.execute(cmd) mock_execute.assert_awaited_once_with(cmd) assert result == "dummy_result"
3. Testing Command Handlers (testing.command_handler)¶
BaseCommandHandlerTest¶
- Purpose: A base class designed to simplify testing of individual
CommandHandlerimplementations. It often pre-initializes common mocks for dependencies likeAsyncSession, an aggregate repository, and an event publisher. - Usage: Inherit from this class. Your test methods can then focus on the handler's
executeandauthorizelogic, using the provided mocks to control dependency behavior and make assertions.# from unittest.mock import AsyncMock # import pytest # from castlecraft_engineer.testing.command_handler import BaseCommandHandlerTest # from your_app.commands import YourActualCommand # from your_app.handlers import YourActualCommandHandler # from your_app.models import YourAggregate # from your_app.repositories import YourAggregateRepository # from castlecraft_engineer.abstractions.event_bus import EventBus # class TestYourActualCommandHandler(BaseCommandHandlerTest): # @pytest.mark.asyncio # async def test_execute_creates_and_saves_aggregate(self): # # self.mock_session, self.mock_repo, self.mock_event_bus are available from BaseCommandHandlerTest # # self.mock_auth_service is also available # # Configure mock_repo.save to do nothing or return a value if needed # self.mock_repo.save = AsyncMock() # self.mock_event_bus.publish_batch = AsyncMock() # handler = YourActualCommandHandler( # session=self.mock_session, # repo=self.mock_repo, # event_bus=self.mock_event_bus, # auth_service=self.mock_auth_service # ) # command = YourActualCommand(name="Test Product") # await handler.authorize(command) # Assuming it passes or mock auth_service # result_id = await handler.execute(command) # self.mock_repo.save.assert_awaited_once() # # Assert that the first argument to save was an instance of YourAggregate # saved_aggregate = self.mock_repo.save.call_args[0][0] # assert isinstance(saved_aggregate, YourAggregate) # assert saved_aggregate.name == "Test Product" # self.mock_event_bus.publish_batch.assert_awaited_once() # assert result_id is not None
4. Testing Event Bus (testing.event_bus)¶
EventBusTestHelper¶
- Purpose: A helper class to facilitate testing interactions with the
EventBus. It allows for easy registration of mock event handlers and assertions on published events. - Usage: Instantiate the helper, subscribe mock handlers, publish events through the bus, and then assert that the mock handlers were called with the correct event.
import pytest from unittest.mock import AsyncMock from castlecraft_engineer.abstractions.event import Event from castlecraft_engineer.abstractions.event_bus import EventBus from castlecraft_engineer.testing.event_bus import EventBusTestHelper class MyTestEvent(Event): data: str @pytest.mark.asyncio async def test_event_bus_with_helper(): helper = EventBusTestHelper() event_bus = helper.event_bus # Get the EventBus instance from the helper mock_handler_one = AsyncMock() helper.subscribe_mock_handler(MyTestEvent, mock_handler_one) test_event = MyTestEvent(data="event_payload") await event_bus.publish(test_event) mock_handler_one.assert_awaited_once_with(test_event)
5. Testing Event Stream Consumers (testing.event_consumer)¶
BaseEventStreamConsumerTest¶
- Purpose: Provides a base testing class for
EventStreamConsumerimplementations. It might offer mocks for external event sources or the internalEventBusthat the consumer publishes to. - Usage: Inherit from this class to test your consumer's logic for receiving, transforming, and dispatching events.
# import pytest # from unittest.mock import AsyncMock # from castlecraft_engineer.testing.event_consumer import BaseEventStreamConsumerTest # from your_app.consumers import YourEventConsumer # Your concrete consumer # from your_app.events import ExternalEventType, InternalEventType # class TestYourEventConsumer(BaseEventStreamConsumerTest): # @pytest.mark.asyncio # async def test_process_message_publishes_to_internal_bus(self): # self.mock_event_bus is available from BaseEventStreamConsumerTest # consumer = YourEventConsumer(event_bus=self.mock_event_bus) # raw_external_message = {"id": "123", "payload": "data"} # Example external message # # Simulate the consumer's internal message processing logic # # This depends on how YourEventConsumer is structured # # For example, if it has a method _process_raw_message: # # await consumer._process_raw_message(raw_external_message) # # Or, if testing a higher-level method that calls the bus: # # await consumer.handle_incoming_message(raw_external_message) # # Assert that the internal event bus's publish method was called with the correct InternalEventType # self.mock_event_bus.publish.assert_awaited_once() # published_event = self.mock_event_bus.publish.call_args[0][0] # assert isinstance(published_event, InternalEventType) # assert published_event.original_id == "123"
6. Testing Event Handlers (testing.event_handler)¶
BaseEventHandlerTest¶
- Purpose: A base class for testing individual
EventHandlerimplementations. It might provide common setup or mocks for dependencies the handler might have (e.g., services, repositories). - Usage: Inherit from this class. Your test methods should focus on the handler's
handlelogic, verifying that it performs the correct actions based on the received event.# import pytest # from unittest.mock import AsyncMock # from castlecraft_engineer.testing.event_handler import BaseEventHandlerTest # from your_app.events import YourAppEvent # from your_app.handlers import YourAppEventHandler # Your concrete event handler # from your_app.services import NotificationService # Example dependency # class TestYourAppEventHandler(BaseEventHandlerTest): # @pytest.mark.asyncio # async def test_handle_sends_notification(self): # mock_notification_service = AsyncMock(spec=NotificationService) # handler = YourAppEventHandler(notification_service=mock_notification_service) # event_data = YourAppEvent(user_id="user1", message="Hello!") # await handler.handle(event_data) # mock_notification_service.send_user_notification.assert_awaited_once_with( # user_id="user1", content="Hello!" # )
7. Mocking External Event Publishers (testing.mock_event_publisher)¶
MockExternalEventPublisher (async) and MockExternalEventPublisherSync (sync)¶
- Purpose: These are mock implementations of the
ExternalEventPublisherabstract class. They allow you to test components that publish events to external systems (like Kafka, RabbitMQ) without making actual network calls. They record published events in apublished_eventslist for later assertion. - Usage: Inject the appropriate mock publisher into the component under test. After the component performs an action that should publish an event, assert the content and number of events in
mock_publisher.published_events.import pytest from castlecraft_engineer.testing.mock_event_publisher import MockExternalEventPublisher from castlecraft_engineer.abstractions.event import Event class MyEvent(Event): content: str class MyServiceThatPublishesExternally: def __init__(self, publisher: MockExternalEventPublisher): # Type hint to ExternalEventPublisher in real code self.publisher = publisher async def do_work_and_publish(self, data: str): event_to_publish = MyEvent(content=data) await self.publisher.publish(event_to_publish) @pytest.mark.asyncio async def test_service_publishes_event_externally(): mock_publisher = MockExternalEventPublisher() service = MyServiceThatPublishesExternally(publisher=mock_publisher) await service.do_work_and_publish("external_payload") assert len(mock_publisher.published_events) == 1 published_event = mock_publisher.published_events[0] assert isinstance(published_event, MyEvent) assert published_event.content == "external_payload"
8. Testing Query Bus (testing.query_bus)¶
query_bus_helper (pytest fixture)¶
- Purpose: Similar to
command_bus_instance, this fixture provides aQueryBusinstance with a cleanpunq.Containerfor isolated testing of query dispatch and handler registration. - Usage: Use this fixture to test if the query bus correctly dispatches queries to registered query handlers and returns their results.
import pytest from unittest.mock import AsyncMock from castlecraft_engineer.abstractions.query_bus import QueryBus from castlecraft_engineer.testing.query_bus import ( query_bus_helper, # pytest fixture DummyQuery, DummyQueryHandler ) @pytest.mark.asyncio async def test_query_bus_dispatches_to_handler(query_bus_helper: QueryBus, mocker): # DummyQueryHandler.execute is an AsyncMock by default expected_result = {"data": "query_result"} mock_execute = AsyncMock(return_value=expected_result) mocker.patch.object(DummyQueryHandler, "execute", mock_execute) query_bus_helper.container.register(DummyQueryHandler) query_bus_helper.register_handler(DummyQuery, DummyQueryHandler) query = DummyQuery(param="search_term") result = await query_bus_helper.execute(query) mock_execute.assert_awaited_once_with(query) assert result == expected_result
9. Testing Query Handlers (testing.query_handler)¶
BaseQueryHandlerTest¶
- Purpose: A base class for testing individual
QueryHandlerimplementations. It typically provides common mocks, such as anAsyncSession, which query handlers often need to fetch data. - Usage: Inherit from this class. Your tests should focus on the handler's
executeandauthorizelogic, verifying that it fetches and returns the correct data based on the query parameters and that authorization checks are performed.# from unittest.mock import AsyncMock # import pytest # from castlecraft_engineer.testing.query_handler import BaseQueryHandlerTest # from your_app.queries import YourActualQuery # from your_app.handlers import YourActualQueryHandler # from your_app.dto import YourQueryResultDTO # from your_app.repositories import YourReadModelRepository # Example dependency # class TestYourActualQueryHandler(BaseQueryHandlerTest): # @pytest.mark.asyncio # async def test_execute_returns_correct_data(self): # self.mock_session and self.mock_auth_service are available # mock_read_repo = AsyncMock(spec=YourReadModelRepository) # expected_data = YourQueryResultDTO(id="123", name="Test Data") # mock_read_repo.fetch_data_by_query_params = AsyncMock(return_value=expected_data) # handler = YourActualQueryHandler( # session=self.mock_session, # read_repo=mock_read_repo, # auth_service=self.mock_auth_service # ) # query = YourActualQuery(filter_param="test") # await handler.authorize(query) # Assuming it passes or mock auth_service # result = await handler.execute(query) # mock_read_repo.fetch_data_by_query_params.assert_awaited_once_with(self.mock_session, "test") # assert result == expected_data
10. Mocking Authorization (testing.authorization)¶
MockAuthorizationService¶
- Purpose: A mock implementation of the
AuthorizationServiceabstract base class. It allows you to easily control the outcome of permission checks in your tests without needing a full-fledged authorization setup. - Usage: Inject
MockAuthorizationServiceinto your handlers or services under test. You can configure it to allow all permissions, deny all, or mock specific permission checks using standard mocking techniques (e.g.,mocker.patch.objecton itscheck_permissionmethod). TheBaseCommandHandlerTestandBaseQueryHandlerTestoften provide an instance asself.mock_auth_service.import pytest from unittest.mock import AsyncMock from castlecraft_engineer.testing.authorization import MockAuthorizationService from castlecraft_engineer.authorization.permission import Permission from castlecraft_engineer.authorization.types import Action, Resource, Scope from castlecraft_engineer.exc import AuthorizationError CAN_DO_THING = Permission(action=Action.CREATE, resource=Resource("THING"), scope=Scope.ANY) class ServiceWithAuth: def __init__(self, auth_service: MockAuthorizationService): # Type hint to AuthorizationService in real code self.auth_service = auth_service async def do_something_requiring_permission(self): await self.auth_service.check_permission(required_permissions=[CAN_DO_THING]) return "Thing done!" @pytest.mark.asyncio async def test_service_with_auth_allowed(mocker): mock_auth_service = MockAuthorizationService() # Default MockAuthorizationService allows all, or you can be explicit: mocker.patch.object(mock_auth_service, 'check_permission', AsyncMock()) service = ServiceWithAuth(auth_service=mock_auth_service) result = await service.do_something_requiring_permission() assert result == "Thing done!" mock_auth_service.check_permission.assert_awaited_once_with(required_permissions=[CAN_DO_THING]) @pytest.mark.asyncio async def test_service_with_auth_denied(mocker): mock_auth_service = MockAuthorizationService() mocker.patch.object(mock_auth_service, 'check_permission', AsyncMock(side_effect=AuthorizationError("Permission denied"))) service = ServiceWithAuth(auth_service=mock_auth_service) with pytest.raises(AuthorizationError, match="Permission denied"): await service.do_something_requiring_permission() mock_auth_service.check_permission.assert_awaited_once_with(required_permissions=[CAN_DO_THING])
By leveraging these testing utilities, you can write more focused, maintainable, and reliable tests for your castlecraft-engineer based applications.