Skip to content

Tutorial: Building a Complete Feature (Simple Task Management)

This tutorial will guide you through building a small but complete feature using the castlecraft-engineer library. We'll create a simple task management system allowing users to create tasks, mark them as complete, and view them.

Prerequisites

Before you start, ensure you have: 1. Completed the steps in the Installation Guide. 2. Configured your environment for database access, particularly SQL_ASYNC_CONNECTION_STRING. For this tutorial, SQLite is recommended for simplicity. Create a .env file in your project root with:

SQL_ASYNC_CONNECTION_STRING="sqlite+aiosqlite:///./tasks_tutorial.db"
# Optional: For seeing SQL statements
# ENABLE_SQL_LOG="true"
# Optional: For encryption example (generate a real key for production)
# SECRET_ENCRYPTION_KEY="000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f"

Suggested Project Structure

For this tutorial, let's assume the following project structure within your application:

your_project_root/
├── .env
├── tasks_app/
│   ├── __init__.py
│   ├── models.py         # Domain models, aggregates, SQLModels, events
│   ├── commands.py       # Command definitions
│   ├── queries.py        # Query definitions
│   ├── handlers.py       # Command, Query, and Event handlers
│   ├── repositories.py   # Repository implementations
│   └── di_setup.py       # Dependency Injection setup for the tasks feature
└── main_runner.py        # Script to execute the feature

Step 1: Domain Modeling (tasks_app/models.py)

First, we define our domain entities: the Task aggregate, its TaskModel for persistence, and related events.

# tasks_app/models.py
import uuid
from enum import Enum
from datetime import datetime, timezone
from typing import Optional
from sqlmodel import SQLModel, Field
from castlecraft_engineer.abstractions.aggregate import Aggregate, Event

class TaskStatus(str, Enum):
    OPEN = "open"
    COMPLETED = "completed"

class TaskModel(SQLModel, table=True):
    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    title: str = Field(index=True)
    description: Optional[str] = None
    status: TaskStatus = Field(default=TaskStatus.OPEN)
    created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    completed_at: Optional[datetime] = Field(default=None)
    version: int = Field(default=1)  # For optimistic concurrency

# --- Events ---
class TaskCreatedEvent(Event):
    task_id: uuid.UUID
    title: str
    created_at: datetime

class TaskCompletedEvent(Event):
    task_id: uuid.UUID
    completed_at: datetime

# --- Aggregate ---
class Task(Aggregate[uuid.UUID, TaskModel]):
    def __init__(self,
                 id: uuid.UUID,
                 title: str,
                 description: Optional[str],
                 status: TaskStatus,
                 created_at: datetime,
                 completed_at: Optional[datetime] = None,
                 version: int = 1):
        super().__init__(id, version)
        self.title = title
        self.description = description
        self.status = status
        self.created_at = created_at
        self.completed_at = completed_at

    @classmethod
    def create(cls, title: str, description: Optional[str]) -> "Task":
        if not title:
            raise ValueError("Task title cannot be empty.")
        task_id = uuid.uuid4()
        created_time = datetime.now(timezone.utc)
        task = cls(
            id=task_id,
            title=title,
            description=description,
            status=TaskStatus.OPEN,
            created_at=created_time,
            version=1
        )
        task.record_event(TaskCreatedEvent(task_id=task.id, title=task.title, created_at=task.created_at))
        return task

    def complete(self):
        if self.status == TaskStatus.COMPLETED:
            # Optionally raise an error or just do nothing
            print(f"Task {self.id} is already completed.")
            return
        self.status = TaskStatus.COMPLETED
        self.completed_at = datetime.now(timezone.utc)
        self.record_event(TaskCompletedEvent(task_id=self.id, completed_at=self.completed_at))

    def to_model(self) -> TaskModel:
        return TaskModel(
            id=self.id,
            title=self.title,
            description=self.description,
            status=self.status,
            created_at=self.created_at,
            completed_at=self.completed_at,
            version=self.version
        )

    @classmethod
    def from_model(cls, model: TaskModel) -> "Task":
        return cls(
            id=model.id,
            title=model.title,
            description=model.description,
            status=model.status,
            created_at=model.created_at,
            completed_at=model.completed_at,
            version=model.version
        )

Step 2: Commands (tasks_app/commands.py)

Define the commands that represent intentions to change the state of our tasks.

# tasks_app/commands.py
import uuid
from dataclasses import dataclass
from typing import Optional
from castlecraft_engineer.abstractions.command import Command

@dataclass(frozen=True)
class CreateTaskCommand(Command[uuid.UUID]): # Returns the created Task ID
    title: str
    description: Optional[str] = None

@dataclass(frozen=True)
class CompleteTaskCommand(Command[None]): # Returns nothing
    task_id: uuid.UUID

Step 3: Repository (tasks_app/repositories.py)

Implement the repository for our Task aggregate.

# tasks_app/repositories.py
import uuid
from castlecraft_engineer.abstractions.aggregate import AsyncAggregateRepository
from .models import Task, TaskModel

class TaskRepository(AsyncAggregateRepository[uuid.UUID, Task, TaskModel]):
    # The base class provides get_by_id, save, delete methods.
    # We can add custom query methods here if needed for the write side.
    pass

Step 4: Queries (tasks_app/queries.py)

Define queries to retrieve task data.

# tasks_app/queries.py
import uuid
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from castlecraft_engineer.abstractions.query import Query
from .models import TaskStatus # For ListOpenTasksQuery

@dataclass(frozen=True)
class GetTaskByIdQuery(Query[Optional[Dict[str, Any]]]): # Returns a dict or None
    task_id: uuid.UUID

@dataclass(frozen=True)
class ListOpenTasksQuery(Query[List[Dict[str, Any]]]): # Returns a list of dicts
    pass # No parameters needed for this simple query

Step 5: Handlers (tasks_app/handlers.py)

Implement handlers for our commands, queries, and events.

# tasks_app/handlers.py
import uuid
from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession

from castlecraft_engineer.abstractions.command_handler import CommandHandler
from castlecraft_engineer.abstractions.query_handler import QueryHandler
from castlecraft_engineer.abstractions.event_handler import EventHandler
from castlecraft_engineer.abstractions.event_bus import EventBus
from castlecraft_engineer.database.repository import AsyncModelRepository # For query handlers
from castlecraft_engineer.authorization.base_service import AuthorizationService
from castlecraft_engineer.authorization.permission import ctx, Permission
from castlecraft_engineer.authorization.types import Action, Resource, Scope
from castlecraft_engineer.exc import AggregateNotFoundError, AuthorizationError

from .models import Task, TaskModel, TaskStatus, TaskCreatedEvent, TaskCompletedEvent
from .commands import CreateTaskCommand, CompleteTaskCommand
from .queries import GetTaskByIdQuery, ListOpenTasksQuery
from .repositories import TaskRepository

# --- Permissions (Example) ---
CAN_CREATE_TASK = Permission(action=Action.CREATE, resource=Resource("TASK"), scope=Scope.ANY)
CAN_COMPLETE_TASK = Permission(action=Action.UPDATE, resource=Resource("TASK"), scope=Scope.ANY) # Or Scope.OWN
CAN_VIEW_TASK = Permission(action=Action.READ, resource=Resource("TASK"), scope=Scope.ANY)

# --- Command Handlers ---
@ctx(required_permissions=[CAN_CREATE_TASK])
class CreateTaskHandler(CommandHandler[CreateTaskCommand, uuid.UUID]):
    def __init__(self, session: AsyncSession, task_repo: TaskRepository, event_bus: EventBus, auth_service: AuthorizationService):
        self.session = session
        self.task_repo = task_repo
        self.event_bus = event_bus
        self.auth_service = auth_service

    async def authorize(self, command: CreateTaskCommand) -> None:
        required_perms = getattr(self, 'required_permissions', [])
        # In a real app, context might include current_user_id
        await self.auth_service.check_permission(required_permissions=required_perms)

    async def execute(self, command: CreateTaskCommand) -> uuid.UUID:
        task = Task.create(title=command.title, description=command.description)
        await self.task_repo.save(task, self.session)
        # Typically, events are published after successful commit by a Unit of Work
        # For simplicity here, we publish directly. Consider UoW for production.
        await self.event_bus.publish_batch(task.pull_uncommitted_events())
        return task.id

@ctx(required_permissions=[CAN_COMPLETE_TASK])
class CompleteTaskHandler(CommandHandler[CompleteTaskCommand, None]):
    def __init__(self, session: AsyncSession, task_repo: TaskRepository, event_bus: EventBus, auth_service: AuthorizationService):
        self.session = session
        self.task_repo = task_repo
        self.event_bus = event_bus
        self.auth_service = auth_service

    async def authorize(self, command: CompleteTaskCommand) -> None:
        required_perms = getattr(self, 'required_permissions', [])
        # Context could include command.task_id to check ownership if scope is Scope.OWN
        await self.auth_service.check_permission(required_permissions=required_perms, context={"task_id": command.task_id})

    async def execute(self, command: CompleteTaskCommand) -> None:
        task = await self.task_repo.get_by_id(command.task_id, self.session)
        if not task:
            raise AggregateNotFoundError(f"Task with ID {command.task_id} not found.")
        task.complete()
        await self.task_repo.save(task, self.session)
        await self.event_bus.publish_batch(task.pull_uncommitted_events())

# --- Query Handlers ---
# For query handlers, we can use a generic AsyncModelRepository for TaskModel
class TaskModelRepository(AsyncModelRepository[TaskModel, uuid.UUID]):
    def __init__(self):
        super().__init__(model_cls=TaskModel)

@ctx(required_permissions=[CAN_VIEW_TASK])
class GetTaskByIdHandler(QueryHandler[GetTaskByIdQuery, Optional[Dict[str, Any]]]):
    def __init__(self, session: AsyncSession, task_model_repo: TaskModelRepository, auth_service: AuthorizationService):
        self.session = session
        self.task_model_repo = task_model_repo
        self.auth_service = auth_service

    async def authorize(self, query: GetTaskByIdQuery) -> None:
        required_perms = getattr(self, 'required_permissions', [])
        await self.auth_service.check_permission(required_permissions=required_perms, context={"task_id": query.task_id})

    async def execute(self, query: GetTaskByIdQuery) -> Optional[Dict[str, Any]]:
        task_model = await self.task_model_repo.get_by_id(self.session, query.task_id)
        return task_model.model_dump() if task_model else None

@ctx(required_permissions=[CAN_VIEW_TASK]) # Assuming listing also needs view permission
class ListOpenTasksHandler(QueryHandler[ListOpenTasksQuery, List[Dict[str, Any]]]):
    def __init__(self, session: AsyncSession, task_model_repo: TaskModelRepository, auth_service: AuthorizationService):
        self.session = session
        self.task_model_repo = task_model_repo
        self.auth_service = auth_service

    async def authorize(self, query: ListOpenTasksQuery) -> None:
        required_perms = getattr(self, 'required_permissions', [])
        await self.auth_service.check_permission(required_permissions=required_perms)

    async def execute(self, query: ListOpenTasksQuery) -> List[Dict[str, Any]]:
        task_models = await self.task_model_repo.get_all(self.session, status=TaskStatus.OPEN, limit=100)
        return [tm.model_dump() for tm in task_models]

# --- Event Handlers ---
class LoggingTaskCreatedEventHandler(EventHandler[TaskCreatedEvent]):
    async def handle(self, event: TaskCreatedEvent) -> None:
        print(f"EVENT LOG: Task '{event.title}' (ID: {event.task_id}) created at {event.created_at}.")

class LoggingTaskCompletedEventHandler(EventHandler[TaskCompletedEvent]):
    async def handle(self, event: TaskCompletedEvent) -> None:
        print(f"EVENT LOG: Task (ID: {event.task_id}) completed at {event.completed_at}.")

Step 6: Dependency Injection Setup (tasks_app/di_setup.py)

Configure the DI container to register all our components.

# tasks_app/di_setup.py
from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel import SQLModel

from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.common.env import SQL_ASYNC_CONNECTION_STRING
from castlecraft_engineer.authorization.base_service import AuthorizationService
from castlecraft_engineer.authorization.default_services import AllowAllAuthorizationService # For tutorial simplicity

from .models import TaskCreatedEvent, TaskCompletedEvent, TaskModel # TaskModel for bootstrap
from .commands import CreateTaskCommand, CompleteTaskCommand
from .queries import GetTaskByIdQuery, ListOpenTasksQuery
from .handlers import (
    CreateTaskHandler, CompleteTaskHandler,
    GetTaskByIdHandler, ListOpenTasksHandler,
    LoggingTaskCreatedEventHandler, LoggingTaskCompletedEventHandler,
    TaskModelRepository
)
from .repositories import TaskRepository

def configure_task_app_dependencies(builder: ContainerBuilder) -> ContainerBuilder:
    # 0. Authorization Service (using AllowAll for this tutorial)
    # In a real app, replace with your custom AuthorizationService implementation
    builder.register(AuthorizationService, AllowAllAuthorizationService)

    # 1. Repositories
    builder.register(TaskRepository)
    builder.register(TaskModelRepository) # For query side

    # 2. Command Bus and Handlers
    # with_command_bus() is called once globally, usually.
    # If called per feature, ensure it's idempotent or handled correctly.
    # For this tutorial, assume it's called before this function if setting up a global container.
    # If this is the main DI setup, then call it here:
    if not hasattr(builder, 'command_bus'): builder.with_command_bus()
    builder.command_bus.register(CreateTaskCommand, CreateTaskHandler)
    builder.command_bus.register(CompleteTaskCommand, CompleteTaskHandler)

    # 3. Query Bus and Handlers
    if not hasattr(builder, 'query_bus'): builder.with_query_bus()
    builder.query_bus.register(GetTaskByIdQuery, GetTaskByIdHandler)
    builder.query_bus.register(ListOpenTasksQuery, ListOpenTasksHandler)

    # 4. Event Bus and Handlers
    if not hasattr(builder, 'event_bus'): builder.with_event_bus()
    builder.event_bus.subscribe(TaskCreatedEvent, LoggingTaskCreatedEventHandler)
    builder.event_bus.subscribe(TaskCompletedEvent, LoggingTaskCompletedEventHandler)

    return builder

async def bootstrap_task_schema():
    # Ensure TaskModel is imported so SQLModel.metadata knows about it.
    engine = create_async_engine(SQL_ASYNC_CONNECTION_STRING, echo=False)
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)
    await engine.dispose()
    print("Task database schema bootstrapped (if not already existing).")

Step 7: Execution (main_runner.py)

Create a script to run our task management feature.

# main_runner.py
import asyncio
import os
import uuid
from dotenv import load_dotenv

from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.command_bus import CommandBus
from castlecraft_engineer.abstractions.query_bus import QueryBus
from castlecraft_engineer.exc import AggregateNotFoundError, AuthorizationError

# Assuming your tasks_app is in the Python path
from tasks_app.commands import CreateTaskCommand, CompleteTaskCommand
from tasks_app.queries import GetTaskByIdQuery, ListOpenTasksQuery
from tasks_app.di_setup import configure_task_app_dependencies, bootstrap_task_schema

async def run_tutorial():
    load_dotenv() # Load .env file

    if not os.getenv("SQL_ASYNC_CONNECTION_STRING"):
        print("Error: SQL_ASYNC_CONNECTION_STRING not set. Please create a .env file.")
        return

    # 1. Bootstrap schema
    await bootstrap_task_schema()

    # 2. Setup DI
    builder = ContainerBuilder()
    builder.with_async_database() # Registers AsyncSession factory
    # If Command/Query/Event buses are not global, initialize them here
    builder.with_command_bus()
    builder.with_query_bus()
    builder.with_event_bus()

    builder = configure_task_app_dependencies(builder)
    container = builder.build()
    print("DI container configured.")

    command_bus = container.resolve(CommandBus)
    query_bus = container.resolve(QueryBus)

    try:
        # 3. Create a new task
        print("
--- Creating Task ---")
        task_title = "Learn Castlecraft Engineer"
        task_desc = "Complete the tutorial for building a complete feature."
        created_task_id = await command_bus.execute(
            CreateTaskCommand(title=task_title, description=task_desc)
        )
        print(f"Task '{task_title}' created with ID: {created_task_id}")

        # 4. Get the created task by ID
        print("
--- Getting Task by ID ---")
        task_details = await query_bus.execute(GetTaskByIdQuery(task_id=created_task_id))
        if task_details:
            print(f"Fetched Task: {task_details}")
        else:
            print(f"Could not fetch task with ID: {created_task_id}")

        # 5. List open tasks
        print("
--- Listing Open Tasks ---")
        open_tasks = await query_bus.execute(ListOpenTasksQuery())
        print(f"Open tasks ({len(open_tasks)}):")
        for task in open_tasks:
            print(f"  - ID: {task['id']}, Title: {task['title']}, Status: {task['status']}")

        # 6. Complete the task
        print("
--- Completing Task ---")
        await command_bus.execute(CompleteTaskCommand(task_id=created_task_id))
        print(f"Task {created_task_id} marked as complete.")

        # 7. Verify task is completed and not in open list
        print("
--- Verifying Task Status ---")
        completed_task_details = await query_bus.execute(GetTaskByIdQuery(task_id=created_task_id))
        if completed_task_details:
            print(f"Fetched Completed Task: {completed_task_details}")
            assert completed_task_details['status'] == 'completed'

        open_tasks_after_completion = await query_bus.execute(ListOpenTasksQuery())
        print(f"Open tasks after completion ({len(open_tasks_after_completion)}):")
        for task in open_tasks_after_completion:
            print(f"  - ID: {task['id']}, Title: {task['title']}, Status: {task['status']}")
        assert not any(t['id'] == created_task_id for t in open_tasks_after_completion)

        # Example of trying to complete an already completed task (optional to test idempotency)
        # print("
--- Attempting to Complete Already Completed Task ---")
        # await command_bus.execute(CompleteTaskCommand(task_id=created_task_id))

        # Example of trying to fetch a non-existent task
        print("
--- Attempting to Fetch Non-Existent Task ---")
        non_existent_id = uuid.uuid4()
        non_existent_task = await query_bus.execute(GetTaskByIdQuery(task_id=non_existent_id))
        if not non_existent_task:
            print(f"Correctly did not find task with ID: {non_existent_id}")

    except AggregateNotFoundError as e:
        print(f"Error: {e}")
    except AuthorizationError as e:
        print(f"Authorization Error: {e}. Check permissions and AuthorizationService setup.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    asyncio.run(run_tutorial())

To run this tutorial: 1. Create the directory structure and files as shown. 2. Ensure python-dotenv is installed (uv pip install python-dotenv or pip install python-dotenv). 3. Make sure castlecraft-engineer and its dependencies (like sqlmodel, sqlalchemy[asyncio], aiosqlite) are installed in your environment. 4. Run python main_runner.py from your_project_root/.

Step 8: Authorization (Briefly Revisited)

In tasks_app/handlers.py, we added @ctx(required_permissions=[...]) to our handlers:

# Example from CreateTaskHandler
# @ctx(required_permissions=[CAN_CREATE_TASK])
# class CreateTaskHandler(...):
#     def __init__(self, ..., auth_service: AuthorizationService):
#         self.auth_service = auth_service
#
#     async def authorize(self, command: CreateTaskCommand) -> None:
#         required_perms = getattr(self, 'required_permissions', [])
#         await self.auth_service.check_permission(required_permissions=required_perms)
We used AllowAllAuthorizationService in tasks_app/di_setup.py for simplicity. In a real application: 1. You would implement a custom AuthorizationService that checks the current user's roles/permissions against the required_perms. 2. The context parameter in check_permission could be used for more granular checks (e.g., context={"task_id": command.task_id, "current_user_id": ...} to verify if the user owns the task for Scope.OWN). 3. The current_user_id or user context would typically be obtained from an authentication mechanism (like a JWT in request headers) and made available to the handlers, possibly via DI or a request context object. Refer to the Authorization Concepts for more details.

Conclusion

This tutorial demonstrated building a complete, albeit simple, feature using castlecraft-engineer. We covered domain modeling, CQRS principles with commands and queries, event handling, repository pattern, dependency injection, and a brief look at authorization.

You can expand on this foundation by adding more complex business logic, robust error handling, user authentication, and more detailed authorization rules.