Skip to content

Testing Applications

castlecraft-engineer provides a suite of testing utilities located in src/castlecraft_engineer/testing/ to help you write effective unit and integration tests for your application components. These utilities aim to simplify test setup, provide useful mocks, and promote best practices.

This guide covers the purpose and basic usage of these testing helpers, primarily demonstrated with pytest.

1. Testing Aggregates and Repositories

testing.aggregate.BaseAggregateTest[T]

  • Purpose: A base class to help test Aggregate root business logic. It provides fixtures like aggregate_id and helper methods such as _create_aggregate, _load_aggregate_from_history, assert_event_recorded, and assert_no_events_recorded to facilitate testing aggregate state changes and event emissions in isolation from persistence.
  • Usage: Inherit from this class when testing your aggregate's methods. Define aggregate_class in your test subclass.
    # from your_domain.aggregates import YourAggregate, YourEvent, AnotherEvent
    # from castlecraft_engineer.testing.aggregate import BaseAggregateTest
    # import uuid
    
    # class TestYourAggregate(BaseAggregateTest[YourAggregate]):
    #     aggregate_class = YourAggregate
    
    #     def test_create_new_aggregate(self, aggregate_id: uuid.UUID):
    #         data_for_creation = {"name": "Test Aggregate"}
    #         aggregate = self._create_aggregate(aggregate_id, **data_for_creation)
    
    #         assert aggregate.id == aggregate_id
    #         assert aggregate.name == "Test Aggregate"
    #         self.assert_event_recorded(aggregate, YourEvent, attributes={"name": "Test Aggregate"})
    
    #     def test_perform_action_emits_event(self, aggregate_id: uuid.UUID):
    #         aggregate = self._create_aggregate(aggregate_id, name="Initial Name")
    #         self._clear_uncommitted_events(aggregate) # Clear creation event if not relevant
    
    #         aggregate.perform_some_action(new_value="Updated Value")
    
    #         assert aggregate.some_property == "Updated Value"
    #         self.assert_event_recorded(aggregate, AnotherEvent, attributes={"changed_value": "Updated Value"})
    

testing.aggregate.BaseAggregateRepositoryTest[T, M, Repo]

  • Purpose: Provides a base class for testing synchronous AggregateRepository implementations. It simplifies mocking the sqlmodel.Session (or sqlalchemy.orm.Session) and offers helper methods like setup_get_by_id_mock, assert_session_add_called, assert_session_commit_called, etc., to verify interactions with the session.
  • Usage: Inherit from this class to test your concrete synchronous aggregate repositories. Define repository_class, aggregate_class, and orm_model_class. Test methods will typically use the mock_session fixture.
    # from unittest.mock import MagicMock
    # import pytest
    # import uuid
    # from castlecraft_engineer.testing.aggregate import BaseAggregateRepositoryTest
    # from castlecraft_engineer.abstractions.aggregate import Aggregate
    # from castlecraft_engineer.abstractions.repository import AggregateRepository
    
    # # --- Define Your Domain Objects (Example) ---
    # class YourAggregate(Aggregate):
    #     def __init__(self, id: uuid.UUID, name: str = ""):
    #         super().__init__(id)
    #         self.name = name
    #     # ... other aggregate methods
    
    # class YourModel: # Your ORM model representation
    #     def __init__(self, id: uuid.UUID, name: str = ""):
    #         self.id = id
    #         self.name = name
    
    # class YourRepository(AggregateRepository[YourAggregate, YourModel]):
    #     def __init__(self, aggregate_cls, model_cls):
    #         super().__init__(aggregate_cls, model_cls)
    #     # Implement save, get_by_id, delete_by_id using synchronous session
    #     def save(self, aggregate: YourAggregate, session: MagicMock): # Session type for test
    #         # Example: Convert aggregate to ORM model and save
    #         orm_model = self._to_orm_model(aggregate) # Assume _to_orm_model exists
    #         session.add(orm_model)
    #         session.commit()
    #         session.refresh(orm_model)
    #         return aggregate # Or void
    
    #     def _to_orm_model(self, aggregate: YourAggregate) -> YourModel:
    #         return YourModel(id=aggregate.id, name=aggregate.name)
    # --- End Define Your Domain Objects ---
    
    # class TestYourSyncRepository(BaseAggregateRepositoryTest[YourAggregate, YourModel, YourRepository]):
    #     repository_class = YourRepository
    #     aggregate_class = YourAggregate
    #     orm_model_class = YourModel
    
    #     def test_save_aggregate(self, repository_instance: YourRepository, mock_session: MagicMock, sample_aggregate: YourAggregate):
    #         # Arrange: sample_aggregate is provided by a fixture
    #         # Mock the ORM model that would be added and refreshed
    #         # This step depends heavily on your _to_orm_model or mapping logic
    #         expected_orm_model = YourModel(id=sample_aggregate.id, name=sample_aggregate.name)
    
    #         # Act
    #         repository_instance.save(sample_aggregate, mock_session)
    
    #         # Assert
    #         # Use specific model instance for assertion if possible, or use mock.ANY for broader checks
    #         self.assert_session_add_called(mock_session, expected_orm_model) # Compares based on __eq__
    #         self.assert_session_commit_called(mock_session)
    #         self.assert_session_refresh_called(mock_session, expected_orm_model)
    

testing.aggregate.BaseAsyncAggregateRepositoryTest[T, M, AsyncRepo]

  • Purpose: Provides a base class for testing AsyncAggregateRepository implementations. It simplifies mocking the sqlalchemy.ext.asyncio.AsyncSession and offers helper methods like setup_get_by_id_mock_async, assert_session_add_called (for the synchronous add part), assert_session_commit_awaited, etc., to verify asynchronous interactions with the session.
  • Usage: Inherit from this class to test your concrete asynchronous aggregate repositories. Define repository_class, aggregate_class, and orm_model_class. Test methods will typically be async and use the mock_async_session fixture.
    # from unittest.mock import AsyncMock
    # import pytest
    # import uuid
    # from castlecraft_engineer.testing.aggregate import BaseAsyncAggregateRepositoryTest
    # from castlecraft_engineer.abstractions.aggregate import Aggregate
    # from castlecraft_engineer.abstractions.repository import AsyncAggregateRepository
    # from sqlalchemy.ext.asyncio import AsyncSession # For type hinting in real repo
    
    # # --- Define Your Domain Objects (Example) ---
    # class YourAggregate(Aggregate): # Same as above
    #     def __init__(self, id: uuid.UUID, name: str = ""):
    #         super().__init__(id)
    #         self.name = name
    
    # class YourModel: # Same as above
    #     def __init__(self, id: uuid.UUID, name: str = ""):
    #         self.id = id
    #         self.name = name
    
    # class YourAsyncRepository(AsyncAggregateRepository[YourAggregate, YourModel]):
    #     def __init__(self, aggregate_cls, model_cls):
    #         super().__init__(aggregate_cls, model_cls)
    #     # Implement save, get_by_id, delete_by_id using asynchronous session
    #     async def save(self, aggregate: YourAggregate, session: AsyncMock): # AsyncSession for real, AsyncMock for test
    #         orm_model = self._to_orm_model(aggregate)
    #         session.add(orm_model) # add is sync
    #         await session.commit()
    #         await session.refresh(orm_model)
    #         return aggregate
    
    #     async def get_by_id(self, aggregate_id: uuid.UUID, session: AsyncMock) -> YourAggregate | None:
    #         orm_model = await session.get(self.model_cls, aggregate_id)
    #         if not orm_model:
    #             return None
    #         return self._to_aggregate(orm_model) # Assume _to_aggregate exists
    
    #     def _to_orm_model(self, aggregate: YourAggregate) -> YourModel:
    #         return YourModel(id=aggregate.id, name=aggregate.name)
    
    #     def _to_aggregate(self, model: YourModel) -> YourAggregate:
    #         return YourAggregate(id=model.id, name=model.name)
    # --- End Define Your Domain Objects ---
    
    # class TestYourAsyncRepository(BaseAsyncAggregateRepositoryTest[YourAggregate, YourModel, YourAsyncRepository]):
    #     repository_class = YourAsyncRepository
    #     aggregate_class = YourAggregate
    #     orm_model_class = YourModel
    
    #     @pytest.mark.asyncio
    #     async def test_save_and_get_aggregate(self, repository_instance: YourAsyncRepository, mock_async_session: AsyncMock, sample_aggregate: YourAggregate):
    #         # Arrange for save
    #         expected_orm_model_on_save = YourModel(id=sample_aggregate.id, name=sample_aggregate.name)
    
    #         # Act: Save
    #         await repository_instance.save(sample_aggregate, mock_async_session)
    
    #         # Assert: Save
    #         self.assert_session_add_called(mock_async_session, expected_orm_model_on_save)
    #         self.assert_session_commit_awaited(mock_async_session)
    #         self.assert_session_refresh_awaited(mock_async_session, expected_orm_model_on_save)
    
    #         # Arrange for get: Reset mocks if necessary or use different session mock for clarity
    #         mock_async_session.reset_mock() # Reset calls from save operation
    #         # Mock the ORM model that would be returned by session.get
    #         mock_orm_instance_for_get = YourModel(id=sample_aggregate.id, name=sample_aggregate.name)
    #         self.setup_get_by_id_mock_async(mock_async_session, mock_orm_instance_for_get)
    
    #         # Act: Get
    #         fetched_aggregate = await repository_instance.get_by_id(sample_aggregate.id, mock_async_session)
    
    #         # Assert: Get
    #         mock_async_session.get.assert_awaited_once_with(self.orm_model_class, sample_aggregate.id)
    #         assert fetched_aggregate is not None
    #         assert fetched_aggregate.id == sample_aggregate.id
    #         assert fetched_aggregate.name == sample_aggregate.name
    

testing.repository.BaseModelRepositoryTest[TSQLModel, SyncRepo]

  • Purpose: Provides a base class for testing synchronous ModelRepository implementations (from castlecraft_engineer.database.repository). It simplifies mocking the sqlalchemy.orm.Session and provides helper methods for common session operations like get, add, commit, refresh, and delete.
  • Usage: Inherit from this class. Define repository_class (your ModelRepository subclass) and model_class (your SQLModel or ORM model). Use fixtures like mock_session, sample_model_id, sample_model_instance, and repository_instance. Helper methods include setup_session_get_mock, setup_session_execute_mock, and assertion helpers for session calls.
    # from unittest.mock import MagicMock
    # import pytest
    # from typing import Optional
    # from sqlmodel import SQLModel, Field
    # from castlecraft_engineer.testing.repository import BaseModelRepositoryTest
    # from castlecraft_engineer.database.repository import ModelRepository
    # from sqlalchemy.orm import Session # For type hinting in real repo
    
    # # --- Define Your Domain Objects (Example) ---
    # class Item(SQLModel, table=True):
    #     id: Optional[int] = Field(default=None, primary_key=True)
    #     name: str
    
    # class ItemRepository(ModelRepository[Item]):
    #     # Your custom repository methods can go here if needed
    #     pass
    # --- End Define Your Domain Objects ---
    
    # class TestItemRepository(BaseModelRepositoryTest[Item, ItemRepository]):
    #     repository_class = ItemRepository
    #     model_class = Item
    
    #     def test_create_item(self, repository_instance: ItemRepository, mock_session: MagicMock, sample_model_instance: Item):
    #         # Act
    #         created_item = repository_instance.create(mock_session, sample_model_instance)
    
    #         # Assert
    #         self.assert_session_add_called(mock_session, sample_model_instance)
    #         self.assert_session_commit_called(mock_session)
    #         self.assert_session_refresh_called(mock_session, sample_model_instance)
    #         assert created_item == sample_model_instance
    
    #     def test_get_item_by_id_found(self, repository_instance: ItemRepository, mock_session: MagicMock, sample_model_instance: Item):
    #         # Arrange
    #         # sample_model_id is available if needed, but sample_model_instance.id can also be used
    #         item_id = sample_model_instance.id
    #         self.setup_session_get_mock(mock_session, return_value=sample_model_instance)
    
    #         # Act
    #         found_item = repository_instance.get_by_id(mock_session, item_id)
    
    #         # Assert
    #         mock_session.get.assert_called_once_with(self.model_class, item_id)
    #         assert found_item == sample_model_instance
    
    #     def test_get_all_items(self, repository_instance: ItemRepository, mock_session: MagicMock, sample_model_instance: Item):
    #         # Arrange
    #         items_to_return = [sample_model_instance]
    #         self.setup_session_execute_mock(mock_session, return_value=items_to_return)
    
    #         # Act
    #         all_items = repository_instance.get_all(mock_session)
    
    #         # Assert
    #         mock_session.execute.assert_called_once() # Further assertion on statement if needed
    #         assert all_items == items_to_return
    

testing.repository.BaseAsyncModelRepositoryTest[TSQLModel, AsyncRepo]

  • Purpose: Provides a base class for testing AsyncModelRepository implementations (from castlecraft_engineer.database.repository). It simplifies mocking the sqlalchemy.ext.asyncio.AsyncSession and provides helper methods for common asynchronous session operations.
  • Usage: Inherit from this class. Define repository_class (your AsyncModelRepository subclass) and model_class (your SQLModel or ORM model). Use fixtures like mock_async_session, sample_model_id, sample_model_instance, and repository_instance. Test methods will typically be async. Helper methods include setup_session_get_mock_async, setup_session_execute_mock_async, and assertion helpers for awaited session calls.
    # from unittest.mock import AsyncMock
    # import pytest
    # from typing import Optional
    # from sqlmodel import SQLModel, Field
    # from castlecraft_engineer.testing.repository import BaseAsyncModelRepositoryTest
    # from castlecraft_engineer.database.repository import AsyncModelRepository
    # from sqlalchemy.ext.asyncio import AsyncSession # For type hinting in real repo
    
    # # --- Define Your Domain Objects (Example) ---
    # class Product(SQLModel, table=True):
    #     id: Optional[int] = Field(default=None, primary_key=True)
    #     name: str
    
    # class ProductAsyncRepository(AsyncModelRepository[Product]):
    #     # Your custom repository methods can go here if needed
    #     pass
    # --- End Define Your Domain Objects ---
    
    # class TestProductAsyncRepository(BaseAsyncModelRepositoryTest[Product, ProductAsyncRepository]):
    #     repository_class = ProductAsyncRepository
    #     model_class = Product
    
    #     @pytest.mark.asyncio
    #     async def test_create_product(self, repository_instance: ProductAsyncRepository, mock_async_session: AsyncMock, sample_model_instance: Product):
    #         # Act
    #         created_product = await repository_instance.create(mock_async_session, sample_model_instance)
    
    #         # Assert
    #         self.assert_session_add_called(mock_async_session, sample_model_instance) # add is sync
    #         self.assert_session_commit_awaited(mock_async_session)
    #         self.assert_session_refresh_awaited(mock_async_session, sample_model_instance)
    #         assert created_product == sample_model_instance
    
    #     @pytest.mark.asyncio
    #     async def test_get_product_by_id_found(self, repository_instance: ProductAsyncRepository, mock_async_session: AsyncMock, sample_model_instance: Product):
    #         # Arrange
    #         product_id = sample_model_instance.id
    #         self.setup_session_get_mock_async(mock_async_session, return_value=sample_model_instance)
    
    #         # Act
    #         found_product = await repository_instance.get_by_id(mock_async_session, product_id)
    
    #         # Assert
    #         mock_async_session.get.assert_awaited_once_with(self.model_class, product_id)
    #         assert found_product == sample_model_instance
    
    #     @pytest.mark.asyncio
    #     async def test_get_all_products(self, repository_instance: ProductAsyncRepository, mock_async_session: AsyncMock, sample_model_instance: Product):
    #         # Arrange
    #         products_to_return = [sample_model_instance]
    #         self.setup_session_execute_mock_async(mock_async_session, return_value=products_to_return)
    
    #         # Act
    #         all_products = await repository_instance.get_all(mock_async_session)
    
    #         # Assert
    #         mock_async_session.execute.assert_awaited_once() # Further assertion on statement if needed
    #         assert all_products == products_to_return
    

2. Testing Command Bus (testing.command_bus)

command_bus_instance (pytest fixture)

  • Purpose: Provides a fully initialized CommandBus instance with a clean punq.Container for each test. This fixture uses clean_container_for_bus_tests to ensure test isolation.
  • Related Dummies: DummyCommand, DummyCommandHandler, AnotherDummyCommand, AnotherDummyCommandHandler are provided to test the bus's registration and dispatch mechanisms without needing real application commands/handlers.
  • Usage: Use this fixture to test if the command bus correctly dispatches commands to registered handlers.
    import pytest
    from unittest.mock import AsyncMock
    from castlecraft_engineer.abstractions.command_bus import CommandBus
    from castlecraft_engineer.testing.command_bus import (
        command_bus_instance, # pytest fixture
        DummyCommand, DummyCommandHandler,
        clean_container_for_bus_tests # Also a fixture, used by command_bus_instance
    )
    
    @pytest.mark.asyncio
    async def test_command_bus_dispatches_to_handler(command_bus_instance: CommandBus, mocker):
        # DummyCommandHandler.execute is an AsyncMock by default if not overridden
        # For this test, let's ensure it's an AsyncMock to check calls
        mock_execute = AsyncMock(return_value="dummy_result")
        mocker.patch.object(DummyCommandHandler, "execute", mock_execute)
    
        # Register the handler (command_bus_instance has a DI container)
        command_bus_instance.container.register(DummyCommandHandler) # Register the handler type
        command_bus_instance.register_handler(DummyCommand, DummyCommandHandler)
    
        cmd = DummyCommand(data="test_data")
        result = await command_bus_instance.execute(cmd)
    
        mock_execute.assert_awaited_once_with(cmd)
        assert result == "dummy_result"
    

3. Testing Command Handlers (testing.command_handler)

BaseCommandHandlerTest

  • Purpose: A base class designed to simplify testing of individual CommandHandler implementations. It often pre-initializes common mocks for dependencies like AsyncSession, an aggregate repository, and an event publisher.
  • Usage: Inherit from this class. Your test methods can then focus on the handler's execute and authorize logic, using the provided mocks to control dependency behavior and make assertions.
    # from unittest.mock import AsyncMock
    # import pytest
    # from castlecraft_engineer.testing.command_handler import BaseCommandHandlerTest
    # from your_app.commands import YourActualCommand
    # from your_app.handlers import YourActualCommandHandler
    # from your_app.models import YourAggregate
    # from your_app.repositories import YourAggregateRepository
    # from castlecraft_engineer.abstractions.event_bus import EventBus
    
    # class TestYourActualCommandHandler(BaseCommandHandlerTest):
    #     @pytest.mark.asyncio
    #     async def test_execute_creates_and_saves_aggregate(self):
    #         # self.mock_session, self.mock_repo, self.mock_event_bus are available from BaseCommandHandlerTest
    #         # self.mock_auth_service is also available
    
    #         # Configure mock_repo.save to do nothing or return a value if needed
    #         self.mock_repo.save = AsyncMock()
    #         self.mock_event_bus.publish_batch = AsyncMock()
    
    #         handler = YourActualCommandHandler(
    #             session=self.mock_session,
    #             repo=self.mock_repo,
    #             event_bus=self.mock_event_bus,
    #             auth_service=self.mock_auth_service
    #         )
    
    #         command = YourActualCommand(name="Test Product")
    #         await handler.authorize(command) # Assuming it passes or mock auth_service
    #         result_id = await handler.execute(command)
    
    #         self.mock_repo.save.assert_awaited_once()
    #         # Assert that the first argument to save was an instance of YourAggregate
    #         saved_aggregate = self.mock_repo.save.call_args[0][0]
    #         assert isinstance(saved_aggregate, YourAggregate)
    #         assert saved_aggregate.name == "Test Product"
    #         self.mock_event_bus.publish_batch.assert_awaited_once()
    #         assert result_id is not None
    

4. Testing Event Bus (testing.event_bus)

EventBusTestHelper

  • Purpose: A helper class to facilitate testing interactions with the EventBus. It allows for easy registration of mock event handlers and assertions on published events.
  • Usage: Instantiate the helper, subscribe mock handlers, publish events through the bus, and then assert that the mock handlers were called with the correct event.
    import pytest
    from unittest.mock import AsyncMock
    from castlecraft_engineer.abstractions.event import Event
    from castlecraft_engineer.abstractions.event_bus import EventBus
    from castlecraft_engineer.testing.event_bus import EventBusTestHelper
    
    class MyTestEvent(Event): data: str
    
    @pytest.mark.asyncio
    async def test_event_bus_with_helper():
        helper = EventBusTestHelper()
        event_bus = helper.event_bus # Get the EventBus instance from the helper
    
        mock_handler_one = AsyncMock()
        helper.subscribe_mock_handler(MyTestEvent, mock_handler_one)
    
        test_event = MyTestEvent(data="event_payload")
        await event_bus.publish(test_event)
    
        mock_handler_one.assert_awaited_once_with(test_event)
    

5. Testing Event Stream Consumers (testing.event_consumer)

BaseEventStreamConsumerTest

  • Purpose: Provides a base testing class for EventStreamConsumer implementations. It might offer mocks for external event sources or the internal EventBus that the consumer publishes to.
  • Usage: Inherit from this class to test your consumer's logic for receiving, transforming, and dispatching events.
    # import pytest
    # from unittest.mock import AsyncMock
    # from castlecraft_engineer.testing.event_consumer import BaseEventStreamConsumerTest
    # from your_app.consumers import YourEventConsumer # Your concrete consumer
    # from your_app.events import ExternalEventType, InternalEventType
    
    # class TestYourEventConsumer(BaseEventStreamConsumerTest):
    #     @pytest.mark.asyncio
    #     async def test_process_message_publishes_to_internal_bus(self):
              # self.mock_event_bus is available from BaseEventStreamConsumerTest
    #         consumer = YourEventConsumer(event_bus=self.mock_event_bus)
    
    #         raw_external_message = {"id": "123", "payload": "data"} # Example external message
    #         # Simulate the consumer's internal message processing logic
    #         # This depends on how YourEventConsumer is structured
    #         # For example, if it has a method _process_raw_message:
    #         # await consumer._process_raw_message(raw_external_message)
    
    #         # Or, if testing a higher-level method that calls the bus:
    #         # await consumer.handle_incoming_message(raw_external_message)
    
    #         # Assert that the internal event bus's publish method was called with the correct InternalEventType
    #         self.mock_event_bus.publish.assert_awaited_once()
    #         published_event = self.mock_event_bus.publish.call_args[0][0]
    #         assert isinstance(published_event, InternalEventType)
    #         assert published_event.original_id == "123"
    

6. Testing Event Handlers (testing.event_handler)

BaseEventHandlerTest

  • Purpose: A base class for testing individual EventHandler implementations. It might provide common setup or mocks for dependencies the handler might have (e.g., services, repositories).
  • Usage: Inherit from this class. Your test methods should focus on the handler's handle logic, verifying that it performs the correct actions based on the received event.
    # import pytest
    # from unittest.mock import AsyncMock
    # from castlecraft_engineer.testing.event_handler import BaseEventHandlerTest
    # from your_app.events import YourAppEvent
    # from your_app.handlers import YourAppEventHandler # Your concrete event handler
    # from your_app.services import NotificationService # Example dependency
    
    # class TestYourAppEventHandler(BaseEventHandlerTest):
    #     @pytest.mark.asyncio
    #     async def test_handle_sends_notification(self):
    #         mock_notification_service = AsyncMock(spec=NotificationService)
    #         handler = YourAppEventHandler(notification_service=mock_notification_service)
    #         event_data = YourAppEvent(user_id="user1", message="Hello!")
    
    #         await handler.handle(event_data)
    
    #         mock_notification_service.send_user_notification.assert_awaited_once_with(
    #             user_id="user1", content="Hello!"
    #         )
    

7. Mocking External Event Publishers (testing.mock_event_publisher)

MockExternalEventPublisher (async) and MockExternalEventPublisherSync (sync)

  • Purpose: These are mock implementations of the ExternalEventPublisher abstract class. They allow you to test components that publish events to external systems (like Kafka, RabbitMQ) without making actual network calls. They record published events in a published_events list for later assertion.
  • Usage: Inject the appropriate mock publisher into the component under test. After the component performs an action that should publish an event, assert the content and number of events in mock_publisher.published_events.
    import pytest
    from castlecraft_engineer.testing.mock_event_publisher import MockExternalEventPublisher
    from castlecraft_engineer.abstractions.event import Event
    
    class MyEvent(Event): content: str
    
    class MyServiceThatPublishesExternally:
        def __init__(self, publisher: MockExternalEventPublisher): # Type hint to ExternalEventPublisher in real code
            self.publisher = publisher
        async def do_work_and_publish(self, data: str):
            event_to_publish = MyEvent(content=data)
            await self.publisher.publish(event_to_publish)
    
    @pytest.mark.asyncio
    async def test_service_publishes_event_externally():
        mock_publisher = MockExternalEventPublisher()
        service = MyServiceThatPublishesExternally(publisher=mock_publisher)
    
        await service.do_work_and_publish("external_payload")
    
        assert len(mock_publisher.published_events) == 1
        published_event = mock_publisher.published_events[0]
        assert isinstance(published_event, MyEvent)
        assert published_event.content == "external_payload"
    

8. Testing Query Bus (testing.query_bus)

query_bus_helper (pytest fixture)

  • Purpose: Similar to command_bus_instance, this fixture provides a QueryBus instance with a clean punq.Container for isolated testing of query dispatch and handler registration.
  • Usage: Use this fixture to test if the query bus correctly dispatches queries to registered query handlers and returns their results.
    import pytest
    from unittest.mock import AsyncMock
    from castlecraft_engineer.abstractions.query_bus import QueryBus
    from castlecraft_engineer.testing.query_bus import (
        query_bus_helper, # pytest fixture
        DummyQuery, DummyQueryHandler
    )
    
    @pytest.mark.asyncio
    async def test_query_bus_dispatches_to_handler(query_bus_helper: QueryBus, mocker):
        # DummyQueryHandler.execute is an AsyncMock by default
        expected_result = {"data": "query_result"}
        mock_execute = AsyncMock(return_value=expected_result)
        mocker.patch.object(DummyQueryHandler, "execute", mock_execute)
    
        query_bus_helper.container.register(DummyQueryHandler)
        query_bus_helper.register_handler(DummyQuery, DummyQueryHandler)
    
        query = DummyQuery(param="search_term")
        result = await query_bus_helper.execute(query)
    
        mock_execute.assert_awaited_once_with(query)
        assert result == expected_result
    

9. Testing Query Handlers (testing.query_handler)

BaseQueryHandlerTest

  • Purpose: A base class for testing individual QueryHandler implementations. It typically provides common mocks, such as an AsyncSession, which query handlers often need to fetch data.
  • Usage: Inherit from this class. Your tests should focus on the handler's execute and authorize logic, verifying that it fetches and returns the correct data based on the query parameters and that authorization checks are performed.
    # from unittest.mock import AsyncMock
    # import pytest
    # from castlecraft_engineer.testing.query_handler import BaseQueryHandlerTest
    # from your_app.queries import YourActualQuery
    # from your_app.handlers import YourActualQueryHandler
    # from your_app.dto import YourQueryResultDTO
    # from your_app.repositories import YourReadModelRepository # Example dependency
    
    # class TestYourActualQueryHandler(BaseQueryHandlerTest):
    #     @pytest.mark.asyncio
    #     async def test_execute_returns_correct_data(self):
              # self.mock_session and self.mock_auth_service are available
    #         mock_read_repo = AsyncMock(spec=YourReadModelRepository)
    #         expected_data = YourQueryResultDTO(id="123", name="Test Data")
    #         mock_read_repo.fetch_data_by_query_params = AsyncMock(return_value=expected_data)
    
    #         handler = YourActualQueryHandler(
    #             session=self.mock_session,
    #             read_repo=mock_read_repo,
    #             auth_service=self.mock_auth_service
    #         )
    
    #         query = YourActualQuery(filter_param="test")
    #         await handler.authorize(query) # Assuming it passes or mock auth_service
    #         result = await handler.execute(query)
    
    #         mock_read_repo.fetch_data_by_query_params.assert_awaited_once_with(self.mock_session, "test")
    #         assert result == expected_data
    

10. Mocking Authorization (testing.authorization)

MockAuthorizationService

  • Purpose: A mock implementation of the AuthorizationService abstract base class. It allows you to easily control the outcome of permission checks in your tests without needing a full-fledged authorization setup.
  • Usage: Inject MockAuthorizationService into your handlers or services under test. You can configure it to allow all permissions, deny all, or mock specific permission checks using standard mocking techniques (e.g., mocker.patch.object on its check_permission method). The BaseCommandHandlerTest and BaseQueryHandlerTest often provide an instance as self.mock_auth_service.
    import pytest
    from unittest.mock import AsyncMock
    from castlecraft_engineer.testing.authorization import MockAuthorizationService
    from castlecraft_engineer.authorization.permission import Permission
    from castlecraft_engineer.authorization.types import Action, Resource, Scope
    from castlecraft_engineer.exc import AuthorizationError
    
    CAN_DO_THING = Permission(action=Action.CREATE, resource=Resource("THING"), scope=Scope.ANY)
    
    class ServiceWithAuth:
        def __init__(self, auth_service: MockAuthorizationService): # Type hint to AuthorizationService in real code
            self.auth_service = auth_service
        async def do_something_requiring_permission(self):
            await self.auth_service.check_permission(required_permissions=[CAN_DO_THING])
            return "Thing done!"
    
    @pytest.mark.asyncio
    async def test_service_with_auth_allowed(mocker):
        mock_auth_service = MockAuthorizationService()
        # Default MockAuthorizationService allows all, or you can be explicit:
        mocker.patch.object(mock_auth_service, 'check_permission', AsyncMock())
        service = ServiceWithAuth(auth_service=mock_auth_service)
        result = await service.do_something_requiring_permission()
        assert result == "Thing done!"
        mock_auth_service.check_permission.assert_awaited_once_with(required_permissions=[CAN_DO_THING])
    
    @pytest.mark.asyncio
    async def test_service_with_auth_denied(mocker):
        mock_auth_service = MockAuthorizationService()
        mocker.patch.object(mock_auth_service, 'check_permission', AsyncMock(side_effect=AuthorizationError("Permission denied")))
        service = ServiceWithAuth(auth_service=mock_auth_service)
        with pytest.raises(AuthorizationError, match="Permission denied"):
            await service.do_something_requiring_permission()
        mock_auth_service.check_permission.assert_awaited_once_with(required_permissions=[CAN_DO_THING])
    

By leveraging these testing utilities, you can write more focused, maintainable, and reliable tests for your castlecraft-engineer based applications.

11. Testing Event-Sourced Components with InMemoryEventStore

For testing aggregates, repositories, or command handlers that interact with an EventStore, Castlecraft Engineer provides InMemoryEventStore located in castlecraft_engineer.testing.event_store.

This in-memory implementation of the EventStore abstraction is perfect for creating fast, isolated unit and integration tests without needing an external database or event store system.

Key Features:

  • Full EventStore Implementation: Behaves like a real event store for appending, loading, and version checking.
  • Optimistic Concurrency: Correctly handles expected_version and raises EventStoreConflictError on mismatches.
  • Test Helpers:
    • clear(): Asynchronously clears all stored streams and versions, useful for test isolation (e.g., in pytest fixtures).
    • get_stream(aggregate_id): Asynchronously returns a copy of the event stream for a specific aggregate, useful for direct assertions on stored events.

Example Usage:

Here's a basic example of how you might use InMemoryEventStore in a pytest test:

import pytest
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List

from castlecraft_engineer.abstractions.event import Event
from castlecraft_engineer.testing import InMemoryEventStore

# Define a sample event for testing purposes
@dataclass(frozen=True)
class ItemCreated(Event):
    item_id: uuid.UUID
    name: str
    event_version: int = 1
    occurred_on: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

    @classmethod
    def event_type_name(cls) -> str:
        return "item.created"

@pytest.fixture
async def event_store() -> InMemoryEventStore[uuid.UUID]:
    store = InMemoryEventStore[uuid.UUID]()
    yield store
    await store.clear() # Clean up after test

@pytest.mark.asyncio
async def test_item_creation_events(event_store: InMemoryEventStore[uuid.UUID]):
    item_id = uuid.uuid4()
    create_event = ItemCreated(item_id=item_id, name="Test Item")

    # Append event for a new aggregate (expected_version = -1)
    await event_store.append_events(item_id, -1, [create_event])

    # Verify current version
    version = await event_store.get_current_version(item_id)
    assert version == 0  # Version is 0-indexed (sequence of last event)

    # Load events and verify
    loaded_events = await event_store.load_events(item_id)
    assert len(loaded_events) == 1
    assert loaded_events[0] == create_event

    # Use get_stream for direct inspection if needed
    stream_copy = await event_store.get_stream(item_id)
    assert stream_copy == [create_event]

By using InMemoryEventStore, you can thoroughly test the event-sourcing aspects of your application components in an isolated and efficient manner.