Tutorial: Building a Complete Feature (Simple Task Management)¶
This tutorial will guide you through building a small but complete feature using the castlecraft-engineer
library. We'll create a simple task management system allowing users to create tasks, mark them as complete, and view them.
Prerequisites¶
Before you start, ensure you have:
1. Completed the steps in the Installation Guide.
2. Configured your environment for database access, particularly SQL_ASYNC_CONNECTION_STRING
. For this tutorial, SQLite is recommended for simplicity. Create a .env
file in your project root with:
SQL_ASYNC_CONNECTION_STRING="sqlite+aiosqlite:///./tasks_tutorial.db"
# Optional: For seeing SQL statements
# ENABLE_SQL_LOG="true"
# Optional: For encryption example (generate a real key for production)
# SECRET_ENCRYPTION_KEY="000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f"
Suggested Project Structure¶
For this tutorial, let's assume the following project structure within your application:
your_project_root/
├── .env
├── tasks_app/
│ ├── __init__.py
│ ├── models.py # Domain models, aggregates, SQLModels, events
│ ├── commands.py # Command definitions
│ ├── queries.py # Query definitions
│ ├── handlers.py # Command, Query, and Event handlers
│ ├── repositories.py # Repository implementations
│ └── di_setup.py # Dependency Injection setup for the tasks feature
└── main_runner.py # Script to execute the feature
Step 1: Domain Modeling (tasks_app/models.py
)¶
First, we define our domain entities: the Task
aggregate, its TaskModel
for persistence, and related events.
# tasks_app/models.py
import uuid
from enum import Enum
from datetime import datetime, timezone
from typing import Optional
from sqlmodel import SQLModel, Field
from castlecraft_engineer.abstractions.aggregate import Aggregate, Event
class TaskStatus(str, Enum):
OPEN = "open"
COMPLETED = "completed"
class TaskModel(SQLModel, table=True):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
title: str = Field(index=True)
description: Optional[str] = None
status: TaskStatus = Field(default=TaskStatus.OPEN)
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
completed_at: Optional[datetime] = Field(default=None)
version: int = Field(default=1) # For optimistic concurrency
# --- Events ---
class TaskCreatedEvent(Event):
task_id: uuid.UUID
title: str
created_at: datetime
class TaskCompletedEvent(Event):
task_id: uuid.UUID
completed_at: datetime
# --- Aggregate ---
class Task(Aggregate[uuid.UUID, TaskModel]):
def __init__(self,
id: uuid.UUID,
title: str,
description: Optional[str],
status: TaskStatus,
created_at: datetime,
completed_at: Optional[datetime] = None,
version: int = 1):
super().__init__(id, version)
self.title = title
self.description = description
self.status = status
self.created_at = created_at
self.completed_at = completed_at
@classmethod
def create(cls, title: str, description: Optional[str]) -> "Task":
if not title:
raise ValueError("Task title cannot be empty.")
task_id = uuid.uuid4()
created_time = datetime.now(timezone.utc)
task = cls(
id=task_id,
title=title,
description=description,
status=TaskStatus.OPEN,
created_at=created_time,
version=1
)
task.record_event(TaskCreatedEvent(task_id=task.id, title=task.title, created_at=task.created_at))
return task
def complete(self):
if self.status == TaskStatus.COMPLETED:
# Optionally raise an error or just do nothing
print(f"Task {self.id} is already completed.")
return
self.status = TaskStatus.COMPLETED
self.completed_at = datetime.now(timezone.utc)
self.record_event(TaskCompletedEvent(task_id=self.id, completed_at=self.completed_at))
def to_model(self) -> TaskModel:
return TaskModel(
id=self.id,
title=self.title,
description=self.description,
status=self.status,
created_at=self.created_at,
completed_at=self.completed_at,
version=self.version
)
@classmethod
def from_model(cls, model: TaskModel) -> "Task":
return cls(
id=model.id,
title=model.title,
description=model.description,
status=model.status,
created_at=model.created_at,
completed_at=model.completed_at,
version=model.version
)
Step 2: Commands (tasks_app/commands.py
)¶
Define the commands that represent intentions to change the state of our tasks.
# tasks_app/commands.py
import uuid
from dataclasses import dataclass
from typing import Optional
from castlecraft_engineer.abstractions.command import Command
@dataclass(frozen=True)
class CreateTaskCommand(Command[uuid.UUID]): # Returns the created Task ID
title: str
description: Optional[str] = None
@dataclass(frozen=True)
class CompleteTaskCommand(Command[None]): # Returns nothing
task_id: uuid.UUID
Step 3: Repository (tasks_app/repositories.py
)¶
Implement the repository for our Task
aggregate.
# tasks_app/repositories.py
import uuid
from castlecraft_engineer.abstractions.aggregate import AsyncAggregateRepository
from .models import Task, TaskModel
class TaskRepository(AsyncAggregateRepository[uuid.UUID, Task, TaskModel]):
# The base class provides get_by_id, save, delete methods.
# We can add custom query methods here if needed for the write side.
pass
Step 4: Queries (tasks_app/queries.py
)¶
Define queries to retrieve task data.
# tasks_app/queries.py
import uuid
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from castlecraft_engineer.abstractions.query import Query
from .models import TaskStatus # For ListOpenTasksQuery
@dataclass(frozen=True)
class GetTaskByIdQuery(Query[Optional[Dict[str, Any]]]): # Returns a dict or None
task_id: uuid.UUID
@dataclass(frozen=True)
class ListOpenTasksQuery(Query[List[Dict[str, Any]]]): # Returns a list of dicts
pass # No parameters needed for this simple query
Step 5: Handlers (tasks_app/handlers.py
)¶
Implement handlers for our commands, queries, and events.
# tasks_app/handlers.py
import uuid
from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from castlecraft_engineer.abstractions.command_handler import CommandHandler
from castlecraft_engineer.abstractions.query_handler import QueryHandler
from castlecraft_engineer.abstractions.event_handler import EventHandler
from castlecraft_engineer.abstractions.event_bus import EventBus
from castlecraft_engineer.database.repository import AsyncModelRepository # For query handlers
from castlecraft_engineer.authorization.base_service import AuthorizationService
from castlecraft_engineer.authorization.permission import ctx, Permission
from castlecraft_engineer.authorization.types import Action, Resource, Scope
from castlecraft_engineer.exc import AggregateNotFoundError, AuthorizationError
from .models import Task, TaskModel, TaskStatus, TaskCreatedEvent, TaskCompletedEvent
from .commands import CreateTaskCommand, CompleteTaskCommand
from .queries import GetTaskByIdQuery, ListOpenTasksQuery
from .repositories import TaskRepository
# --- Permissions (Example) ---
CAN_CREATE_TASK = Permission(action=Action.CREATE, resource=Resource("TASK"), scope=Scope.ANY)
CAN_COMPLETE_TASK = Permission(action=Action.UPDATE, resource=Resource("TASK"), scope=Scope.ANY) # Or Scope.OWN
CAN_VIEW_TASK = Permission(action=Action.READ, resource=Resource("TASK"), scope=Scope.ANY)
# --- Command Handlers ---
@ctx(required_permissions=[CAN_CREATE_TASK])
class CreateTaskHandler(CommandHandler[CreateTaskCommand, uuid.UUID]):
def __init__(self, session: AsyncSession, task_repo: TaskRepository, event_bus: EventBus, auth_service: AuthorizationService):
self.session = session
self.task_repo = task_repo
self.event_bus = event_bus
self.auth_service = auth_service
async def authorize(self, command: CreateTaskCommand) -> None:
required_perms = getattr(self, 'required_permissions', [])
# In a real app, context might include current_user_id
await self.auth_service.check_permission(required_permissions=required_perms)
async def execute(self, command: CreateTaskCommand) -> uuid.UUID:
task = Task.create(title=command.title, description=command.description)
await self.task_repo.save(task, self.session)
# Typically, events are published after successful commit by a Unit of Work
# For simplicity here, we publish directly. Consider UoW for production.
await self.event_bus.publish_batch(task.pull_uncommitted_events())
return task.id
@ctx(required_permissions=[CAN_COMPLETE_TASK])
class CompleteTaskHandler(CommandHandler[CompleteTaskCommand, None]):
def __init__(self, session: AsyncSession, task_repo: TaskRepository, event_bus: EventBus, auth_service: AuthorizationService):
self.session = session
self.task_repo = task_repo
self.event_bus = event_bus
self.auth_service = auth_service
async def authorize(self, command: CompleteTaskCommand) -> None:
required_perms = getattr(self, 'required_permissions', [])
# Context could include command.task_id to check ownership if scope is Scope.OWN
await self.auth_service.check_permission(required_permissions=required_perms, context={"task_id": command.task_id})
async def execute(self, command: CompleteTaskCommand) -> None:
task = await self.task_repo.get_by_id(command.task_id, self.session)
if not task:
raise AggregateNotFoundError(f"Task with ID {command.task_id} not found.")
task.complete()
await self.task_repo.save(task, self.session)
await self.event_bus.publish_batch(task.pull_uncommitted_events())
# --- Query Handlers ---
# For query handlers, we can use a generic AsyncModelRepository for TaskModel
class TaskModelRepository(AsyncModelRepository[TaskModel, uuid.UUID]):
def __init__(self):
super().__init__(model_cls=TaskModel)
@ctx(required_permissions=[CAN_VIEW_TASK])
class GetTaskByIdHandler(QueryHandler[GetTaskByIdQuery, Optional[Dict[str, Any]]]):
def __init__(self, session: AsyncSession, task_model_repo: TaskModelRepository, auth_service: AuthorizationService):
self.session = session
self.task_model_repo = task_model_repo
self.auth_service = auth_service
async def authorize(self, query: GetTaskByIdQuery) -> None:
required_perms = getattr(self, 'required_permissions', [])
await self.auth_service.check_permission(required_permissions=required_perms, context={"task_id": query.task_id})
async def execute(self, query: GetTaskByIdQuery) -> Optional[Dict[str, Any]]:
task_model = await self.task_model_repo.get_by_id(self.session, query.task_id)
return task_model.model_dump() if task_model else None
@ctx(required_permissions=[CAN_VIEW_TASK]) # Assuming listing also needs view permission
class ListOpenTasksHandler(QueryHandler[ListOpenTasksQuery, List[Dict[str, Any]]]):
def __init__(self, session: AsyncSession, task_model_repo: TaskModelRepository, auth_service: AuthorizationService):
self.session = session
self.task_model_repo = task_model_repo
self.auth_service = auth_service
async def authorize(self, query: ListOpenTasksQuery) -> None:
required_perms = getattr(self, 'required_permissions', [])
await self.auth_service.check_permission(required_permissions=required_perms)
async def execute(self, query: ListOpenTasksQuery) -> List[Dict[str, Any]]:
task_models = await self.task_model_repo.get_all(self.session, status=TaskStatus.OPEN, limit=100)
return [tm.model_dump() for tm in task_models]
# --- Event Handlers ---
class LoggingTaskCreatedEventHandler(EventHandler[TaskCreatedEvent]):
async def handle(self, event: TaskCreatedEvent) -> None:
print(f"EVENT LOG: Task '{event.title}' (ID: {event.task_id}) created at {event.created_at}.")
class LoggingTaskCompletedEventHandler(EventHandler[TaskCompletedEvent]):
async def handle(self, event: TaskCompletedEvent) -> None:
print(f"EVENT LOG: Task (ID: {event.task_id}) completed at {event.completed_at}.")
Step 6: Dependency Injection Setup (tasks_app/di_setup.py
)¶
Configure the DI container to register all our components.
# tasks_app/di_setup.py
from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel import SQLModel
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.common.env import SQL_ASYNC_CONNECTION_STRING
from castlecraft_engineer.authorization.base_service import AuthorizationService
from castlecraft_engineer.authorization.default_services import AllowAllAuthorizationService # For tutorial simplicity
from .models import TaskCreatedEvent, TaskCompletedEvent, TaskModel # TaskModel for bootstrap
from .commands import CreateTaskCommand, CompleteTaskCommand
from .queries import GetTaskByIdQuery, ListOpenTasksQuery
from .handlers import (
CreateTaskHandler, CompleteTaskHandler,
GetTaskByIdHandler, ListOpenTasksHandler,
LoggingTaskCreatedEventHandler, LoggingTaskCompletedEventHandler,
TaskModelRepository
)
from .repositories import TaskRepository
def configure_task_app_dependencies(builder: ContainerBuilder) -> ContainerBuilder:
# 0. Authorization Service (using AllowAll for this tutorial)
# In a real app, replace with your custom AuthorizationService implementation
builder.register(AuthorizationService, AllowAllAuthorizationService)
# 1. Repositories
builder.register(TaskRepository)
builder.register(TaskModelRepository) # For query side
# 2. Command Bus and Handlers
# with_command_bus() is called once globally, usually.
# If called per feature, ensure it's idempotent or handled correctly.
# For this tutorial, assume it's called before this function if setting up a global container.
# If this is the main DI setup, then call it here:
if not hasattr(builder, 'command_bus'): builder.with_command_bus()
builder.command_bus.register(CreateTaskCommand, CreateTaskHandler)
builder.command_bus.register(CompleteTaskCommand, CompleteTaskHandler)
# 3. Query Bus and Handlers
if not hasattr(builder, 'query_bus'): builder.with_query_bus()
builder.query_bus.register(GetTaskByIdQuery, GetTaskByIdHandler)
builder.query_bus.register(ListOpenTasksQuery, ListOpenTasksHandler)
# 4. Event Bus and Handlers
if not hasattr(builder, 'event_bus'): builder.with_event_bus()
builder.event_bus.subscribe(TaskCreatedEvent, LoggingTaskCreatedEventHandler)
builder.event_bus.subscribe(TaskCompletedEvent, LoggingTaskCompletedEventHandler)
return builder
async def bootstrap_task_schema():
# Ensure TaskModel is imported so SQLModel.metadata knows about it.
engine = create_async_engine(SQL_ASYNC_CONNECTION_STRING, echo=False)
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
await engine.dispose()
print("Task database schema bootstrapped (if not already existing).")
Step 7: Execution (main_runner.py
)¶
Create a script to run our task management feature.
# main_runner.py
import asyncio
import os
import uuid
from dotenv import load_dotenv
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.command_bus import CommandBus
from castlecraft_engineer.abstractions.query_bus import QueryBus
from castlecraft_engineer.exc import AggregateNotFoundError, AuthorizationError
# Assuming your tasks_app is in the Python path
from tasks_app.commands import CreateTaskCommand, CompleteTaskCommand
from tasks_app.queries import GetTaskByIdQuery, ListOpenTasksQuery
from tasks_app.di_setup import configure_task_app_dependencies, bootstrap_task_schema
async def run_tutorial():
load_dotenv() # Load .env file
if not os.getenv("SQL_ASYNC_CONNECTION_STRING"):
print("Error: SQL_ASYNC_CONNECTION_STRING not set. Please create a .env file.")
return
# 1. Bootstrap schema
await bootstrap_task_schema()
# 2. Setup DI
builder = ContainerBuilder()
builder.with_async_database() # Registers AsyncSession factory
# If Command/Query/Event buses are not global, initialize them here
builder.with_command_bus()
builder.with_query_bus()
builder.with_event_bus()
builder = configure_task_app_dependencies(builder)
container = builder.build()
print("DI container configured.")
command_bus = container.resolve(CommandBus)
query_bus = container.resolve(QueryBus)
try:
# 3. Create a new task
print("
--- Creating Task ---")
task_title = "Learn Castlecraft Engineer"
task_desc = "Complete the tutorial for building a complete feature."
created_task_id = await command_bus.execute(
CreateTaskCommand(title=task_title, description=task_desc)
)
print(f"Task '{task_title}' created with ID: {created_task_id}")
# 4. Get the created task by ID
print("
--- Getting Task by ID ---")
task_details = await query_bus.execute(GetTaskByIdQuery(task_id=created_task_id))
if task_details:
print(f"Fetched Task: {task_details}")
else:
print(f"Could not fetch task with ID: {created_task_id}")
# 5. List open tasks
print("
--- Listing Open Tasks ---")
open_tasks = await query_bus.execute(ListOpenTasksQuery())
print(f"Open tasks ({len(open_tasks)}):")
for task in open_tasks:
print(f" - ID: {task['id']}, Title: {task['title']}, Status: {task['status']}")
# 6. Complete the task
print("
--- Completing Task ---")
await command_bus.execute(CompleteTaskCommand(task_id=created_task_id))
print(f"Task {created_task_id} marked as complete.")
# 7. Verify task is completed and not in open list
print("
--- Verifying Task Status ---")
completed_task_details = await query_bus.execute(GetTaskByIdQuery(task_id=created_task_id))
if completed_task_details:
print(f"Fetched Completed Task: {completed_task_details}")
assert completed_task_details['status'] == 'completed'
open_tasks_after_completion = await query_bus.execute(ListOpenTasksQuery())
print(f"Open tasks after completion ({len(open_tasks_after_completion)}):")
for task in open_tasks_after_completion:
print(f" - ID: {task['id']}, Title: {task['title']}, Status: {task['status']}")
assert not any(t['id'] == created_task_id for t in open_tasks_after_completion)
# Example of trying to complete an already completed task (optional to test idempotency)
# print("
--- Attempting to Complete Already Completed Task ---")
# await command_bus.execute(CompleteTaskCommand(task_id=created_task_id))
# Example of trying to fetch a non-existent task
print("
--- Attempting to Fetch Non-Existent Task ---")
non_existent_id = uuid.uuid4()
non_existent_task = await query_bus.execute(GetTaskByIdQuery(task_id=non_existent_id))
if not non_existent_task:
print(f"Correctly did not find task with ID: {non_existent_id}")
except AggregateNotFoundError as e:
print(f"Error: {e}")
except AuthorizationError as e:
print(f"Authorization Error: {e}. Check permissions and AuthorizationService setup.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(run_tutorial())
To run this tutorial:
1. Create the directory structure and files as shown.
2. Ensure python-dotenv
is installed (uv pip install python-dotenv
or pip install python-dotenv
).
3. Make sure castlecraft-engineer
and its dependencies (like sqlmodel
, sqlalchemy[asyncio]
, aiosqlite
) are installed in your environment.
4. Run python main_runner.py
from your_project_root/
.
Step 8: Authorization (Briefly Revisited)¶
In tasks_app/handlers.py
, we added @ctx(required_permissions=[...])
to our handlers:
# Example from CreateTaskHandler
# @ctx(required_permissions=[CAN_CREATE_TASK])
# class CreateTaskHandler(...):
# def __init__(self, ..., auth_service: AuthorizationService):
# self.auth_service = auth_service
#
# async def authorize(self, command: CreateTaskCommand) -> None:
# required_perms = getattr(self, 'required_permissions', [])
# await self.auth_service.check_permission(required_permissions=required_perms)
AllowAllAuthorizationService
in tasks_app/di_setup.py
for simplicity. In a real application:
1. You would implement a custom AuthorizationService
that checks the current user's roles/permissions against the required_perms
.
2. The context
parameter in check_permission
could be used for more granular checks (e.g., context={"task_id": command.task_id, "current_user_id": ...}
to verify if the user owns the task for Scope.OWN
).
3. The current_user_id
or user context would typically be obtained from an authentication mechanism (like a JWT in request headers) and made available to the handlers, possibly via DI or a request context object.
Refer to the Authorization Concepts for more details.
Conclusion¶
This tutorial demonstrated building a complete, albeit simple, feature using castlecraft-engineer
. We covered domain modeling, CQRS principles with commands and queries, event handling, repository pattern, dependency injection, and a brief look at authorization.
You can expand on this foundation by adding more complex business logic, robust error handling, user authentication, and more detailed authorization rules.