Quickstart Guide¶
This guide provides a concise, step-by-step walkthrough to get a minimal application feature running using the castlecraft-engineer library. We'll build a simple Product creation feature.
Prerequisites¶
Before you begin, ensure you have:
1. Installed castlecraft-engineer as described in the Installation Guide.
2. Configured your environment variables, especially SQL_ASYNC_CONNECTION_STRING. For this quickstart, we'll use SQLite. Create a .env file in your project root with:
SQL_ASYNC_CONNECTION_STRING="sqlite+aiosqlite:///./quickstart.db"
# Optional: For seeing SQL statements
# ENABLE_SQL_LOG="true"
Project Structure (Suggested)¶
For this guide, let's imagine a simple project structure:
Step 1: Define the Domain Model (Aggregate and SQLModel)¶
First, let's define our Product aggregate and its corresponding ProductModel for database persistence.
The Aggregate base class handles id, version, and uncommitted_events.
The version field in ProductModel is used by AggregateRepository for optimistic concurrency control.
Create models.py:
import uuid
from typing import List, Optional
from sqlmodel import SQLModel, Field
from castlecraft_engineer.abstractions.aggregate import Aggregate, Event
class ProductModel(SQLModel, table=True):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
name: str = Field(index=True)
version: int = Field(default=1) # For optimistic concurrency
class ProductCreated(Event):
product_id: uuid.UUID
name: str
class Product(Aggregate[uuid.UUID, ProductModel]):
def __init__(self, id: uuid.UUID, name: str, version: int = 1):
super().__init__(id, version)
self.name = name
@classmethod
def create(cls, name: str) -> "Product":
product_id = uuid.uuid4()
# In a real scenario, you might validate the name or other business rules here.
product = cls(id=product_id, name=name, version=1)
product.record_event(ProductCreated(product_id=product.id, name=product.name))
return product
def to_model(self) -> ProductModel:
return ProductModel(id=self.id, name=self.name, version=self.version)
@classmethod
def from_model(cls, model: ProductModel) -> "Product":
return cls(id=model.id, name=model.name, version=model.version)
Step 2: Define the Command¶
Next, define a command to represent the intention to create a new product.
Create commands.py:
from dataclasses import dataclass
from castlecraft_engineer.abstractions.command import Command
@dataclass(frozen=True)
class CreateProductCommand(Command):
name: str
Step 3: Implement the Command Handler and Repository¶
The command handler will process the CreateProductCommand. It uses an AsyncAggregateRepository to persist the Product aggregate. The ProductRepository is a concrete implementation of this repository.
Create handlers.py:
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from castlecraft_engineer.abstractions.aggregate import AsyncAggregateRepository
from castlecraft_engineer.abstractions.command_handler import CommandHandler
from castlecraft_engineer.abstractions.event_publisher import ExternalEventPublisher
from .models import Product, ProductModel # Assuming models.py is in the same directory
from .commands import CreateProductCommand # Assuming commands.py is in the same directory
class ProductRepository(AsyncAggregateRepository[uuid.UUID, Product, ProductModel]):
# The base AsyncAggregateRepository provides all necessary CRUD and save logic
# for aggregates when given the Aggregate type and SQLModel type.
pass
class CreateProductHandler(CommandHandler[CreateProductCommand, None]):
def __init__(self, session: AsyncSession, product_repo: ProductRepository, event_publisher: ExternalEventPublisher):
self.session = session
self.product_repo = product_repo
self.event_publisher = event_publisher # Example of another injected dependency
async def execute(self, command: CreateProductCommand) -> None:
product = Product.create(name=command.name)
await self.product_repo.save(product, self.session)
# In a more complete application, you would typically publish events
# after the session has been successfully committed.
# For example:
# async with self.session.begin_nested(): # If using nested transactions for save
# await self.product_repo.save(product, self.session)
# # After commit (or as part of a unit of work pattern):
# for event in product.pull_uncommitted_events():
# await self.event_publisher.publish(event)
print(f"Product '{product.name}' (ID: {product.id}, Version: {product.version}) created and saved.")
async def authorize(self, command: CreateProductCommand) -> None:
# For this quickstart, we'll allow all operations.
# In a real application, you would implement authorization logic here.
pass
Step 4: Configure Dependency Injection¶
We'll use ContainerBuilder from common/di.py to register our components. castlecraft-engineer uses punq for dependency injection.
The builder.with_async_database() method will set up the AsyncSession factory based on your SQL_ASYNC_CONNECTION_STRING environment variable.
Create di_setup.py:
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.event_publisher import ExternalEventPublisher
from castlecraft_engineer.testing.mock_event_publisher import MockExternalEventPublisher # Using a mock for quickstart
from .handlers import CreateProductHandler, ProductRepository # Assuming local imports
from .commands import CreateProductCommand
def configure_container() -> ContainerBuilder:
builder = ContainerBuilder()
# 1. Database: Registers AsyncSession factory
builder.with_async_database()
# 2. Repositories
builder.register(ProductRepository)
# 3. Event Publisher (using a mock for this example)
builder.register(ExternalEventPublisher, MockExternalEventPublisher)
# 4. Command Bus and Handlers
builder.with_command_bus() # Registers the CommandBus itself
# Register our specific handler for the CreateProductCommand
builder.command_bus.register(CreateProductCommand, CreateProductHandler)
return builder
# --- Database Schema Bootstrapping ---
# This part is for setting up the database table for the first time.
from sqlalchemy.ext.asyncio import create_async_engine
from castlecraft_engineer.common.env import SQL_ASYNC_CONNECTION_STRING
from sqlmodel import SQLModel
# Crucially, import your SQLModel classes here so SQLModel.metadata knows about them.
from .models import ProductModel # noqa
async def bootstrap_schema():
# Ensure SQL_ASYNC_CONNECTION_STRING is set in your .env file
# e.g., SQL_ASYNC_CONNECTION_STRING="sqlite+aiosqlite:///./quickstart.db"
engine = create_async_engine(SQL_ASYNC_CONNECTION_STRING, echo=False) # Set echo=True to see SQL
async with engine.begin() as conn:
# This creates tables for all models registered with SQLModel.metadata
await conn.run_sync(SQLModel.metadata.create_all)
await engine.dispose() # Dispose of the engine after use
print("Database schema bootstrapped (if not already existing).")
Step 5: Execute the Command¶
Finally, let's create a script to initialize the DI container, get the command bus, and execute our command.
Create main.py:
import asyncio
import os
from dotenv import load_dotenv
from .commands import CreateProductCommand # Assuming local imports
from .di_setup import configure_container, bootstrap_schema
from castlecraft_engineer.abstractions.command_bus import CommandBus
async def run_quickstart():
# Load environment variables from .env file (optional, good practice)
load_dotenv()
# Check if the database connection string is set
if not os.getenv("SQL_ASYNC_CONNECTION_STRING"):
print("Error: SQL_ASYNC_CONNECTION_STRING is not set in your .env file.")
print("Please set it, e.g., SQL_ASYNC_CONNECTION_STRING='sqlite+aiosqlite:///./quickstart.db'")
return
# 1. Bootstrap database schema (run once, or ensure it's idempotent)
# Make sure ProductModel (and any other models) are imported in di_setup.py
# before bootstrap_schema is called so SQLModel.metadata is populated.
await bootstrap_schema()
# 2. Configure and build DI container
container_builder = configure_container()
container = container_builder.build()
print("DI container configured and built.")
# 3. Resolve the CommandBus from the container
command_bus = container.resolve(CommandBus)
# 4. Create and execute the command
product_name = "Super Whizbang Gadget"
create_cmd = CreateProductCommand(name=product_name)
print(f"Attempting to execute command: CreateProductCommand(name='{product_name}')")
try:
await command_bus.execute(create_cmd)
print(f"Command executed successfully.")
except Exception as e:
print(f"An error occurred during command execution: {e}")
import traceback
traceback.print_exc()
# Optional: Verify creation (the handler already prints a message)
# In a real application, you might query the database here using a QueryHandler
# or directly for a quick check.
print("Quickstart finished.")
if __name__ == "__main__":
asyncio.run(run_quickstart())
To run this quickstart:
1. Ensure you have a .env file in the same directory as main.py with SQL_ASYNC_CONNECTION_STRING="sqlite+aiosqlite:///./quickstart.db".
2. Make sure python-dotenv is installed (uv pip install python-dotenv or pip install python-dotenv) if you keep the load_dotenv() line.
3. Run python main.py from the your_project/ directory.
You should see output indicating the product creation and schema bootstrapping.
Conclusion¶
This quickstart demonstrated the basic flow of defining domain models, commands, handlers, setting up dependency injection, and executing a command using castlecraft-engineer. From here, you can explore more advanced concepts like Queries, Events, Authorization, and more detailed Repository patterns covered in other sections of the documentation.