Quickstart Guide¶
This guide provides a concise, step-by-step walkthrough to get a minimal application feature running using the castlecraft-engineer
library. We'll build a simple Product creation feature.
Prerequisites¶
Before you begin, ensure you have:
1. Installed castlecraft-engineer
as described in the Installation Guide.
2. Configured your environment variables, especially SQL_ASYNC_CONNECTION_STRING
. For this quickstart, we'll use SQLite. Create a .env
file in your project root with:
SQL_ASYNC_CONNECTION_STRING="sqlite+aiosqlite:///./quickstart.db"
# Optional: For seeing SQL statements
# ENABLE_SQL_LOG="true"
Project Structure (Suggested)¶
For this guide, let's imagine a simple project structure:
Step 1: Define the Domain Model (Aggregate and SQLModel)¶
First, let's define our Product
aggregate and its corresponding ProductModel
for database persistence.
The Aggregate
base class (which Product
will inherit from) typically handles id
, version
, and a mechanism for recording uncommitted events (e.g., _record_event
).
The version
field in ProductModel
is used by the AsyncAggregateRepository
for optimistic concurrency control.
Create models.py
:
import uuid
from dataclasses import dataclass
from typing import Optional
from sqlmodel import SQLModel, Field
from castlecraft_engineer.abstractions.aggregate import Aggregate
class ProductModel(SQLModel, table=True):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
name: str = Field(index=True)
version: int = Field(default=1) # For optimistic concurrency
@dataclass(frozen=True)
class ProductCreated:
product_id: uuid.UUID
name: str
creator_id: uuid.UUID
class Product(Aggregate[ProductModel]):
name: str
def __init__(self, id: uuid.UUID, name: str, version: int):
super().__init__(id=id, version=version) # Pass id and version to base Aggregate
self.name = name
@classmethod
def create(cls, name: str, creator_id: uuid.UUID) -> "Product":
product_id = uuid.uuid4()
if not name:
raise ValueError("Product name cannot be empty.")
# Initial version for a new aggregate is typically 1
instance = cls(id=product_id, name=name, version=1)
# Assuming _record_event is a method on the base Aggregate class
instance._record_event(ProductCreated(product_id=instance.id, name=instance.name, creator_id=creator_id))
return instance
def to_model(self) -> ProductModel:
"""Converts the aggregate to its persistence model."""
return ProductModel(id=self.id, name=self.name, version=self.version)
@classmethod
def from_model(cls, model: ProductModel) -> "Product":
"""Creates an aggregate instance from its persistence model."""
instance = cls(id=model.id, name=model.name, version=model.version)
return instance
Step 2: Define the Command¶
Next, define a command to represent the intention to create a new product. Commands should be immutable.
Create commands.py
:
import uuid # Added for creator_id type hint
from dataclasses import dataclass
from castlecraft_engineer.abstractions.command import Command
@dataclass(frozen=True)
class CreateProductCommand(Command):
name: str
creator_id: uuid.UUID
Step 3: Implement the Command Handler and Repository¶
The command handler processes the CreateProductCommand
. It uses an AsyncAggregateRepository
to persist the Product
aggregate. The ProductRepository
is a concrete implementation of this repository, specifying the aggregate and model types it works with.
Create handlers.py
:
import uuid
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from castlecraft_engineer.abstractions.repository import AsyncAggregateRepository
from castlecraft_engineer.abstractions.command_handler import CommandHandler
from castlecraft_engineer.abstractions.event_publisher import ExternalEventPublisher
from .models import Product, ProductModel
from .commands import CreateProductCommand
class ProductRepository(AsyncAggregateRepository[uuid.UUID, Product, ProductModel]):
def __init__(self):
super().__init__(
aggregate_cls=Product,
model_cls=ProductModel,
)
class CreateProductHandler(CommandHandler[CreateProductCommand]):
def __init__(self, session: AsyncSession, product_repo: ProductRepository, event_publisher: ExternalEventPublisher):
self.session = session
self.product_repo = product_repo
self.event_publisher = event_publisher
async def execute(self, command: CreateProductCommand) -> None: # Or -> Any
product = Product.create(name=command.name, creator_id=command.creator_id)
await self.product_repo.save(product, self.session)
# In a real application, events are typically published after successful UoW commit.
# for event in product.pull_uncommitted_events():
# await self.event_publisher.publish(event)
print(f"Product '{product.name}' (ID: {product.id}, Version: {product.version}) created and saved.")
async def authorize(self, command: CreateProductCommand) -> bool:
# For this quickstart, we'll allow all operations.
return True
Step 4: Configure Dependency Injection¶
We'll use ContainerBuilder
from castlecraft_engineer.common.di
to register components. castlecraft-engineer
uses punq
for DI.
Create di_setup.py
:
from castlecraft_engineer.common.di import ContainerBuilder
from castlecraft_engineer.abstractions.event_publisher import ExternalEventPublisher
from castlecraft_engineer.testing.mock_event_publisher import MockExternalEventPublisher
from .handlers import CreateProductHandler, ProductRepository
from .commands import CreateProductCommand
def configure_container() -> ContainerBuilder:
builder = ContainerBuilder()
builder.with_async_database()
builder.register(ProductRepository)
builder.register(ExternalEventPublisher, MockExternalEventPublisher)
builder.with_command_bus()
builder.command_bus.register(CreateProductCommand, CreateProductHandler)
return builder
# --- Database Schema Bootstrapping ---
from sqlalchemy.ext.asyncio import create_async_engine
from castlecraft_engineer.common.env import SQL_ASYNC_CONNECTION_STRING
from sqlmodel import SQLModel
from .models import ProductModel # Ensures ProductModel is known to SQLModel.metadata
async def bootstrap_schema():
engine = create_async_engine(SQL_ASYNC_CONNECTION_STRING, echo=False)
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
await engine.dispose()
print("Database schema bootstrapped (if not already existing).")
Step 5: Execute the Command¶
Create main.py
:
import asyncio
import os
import uuid # Added for creator_id
from dotenv import load_dotenv
from .commands import CreateProductCommand
from .di_setup import configure_container, bootstrap_schema
from castlecraft_engineer.abstractions.command_bus import CommandBus
async def run_quickstart():
load_dotenv()
if not os.getenv("SQL_ASYNC_CONNECTION_STRING"):
print("Error: SQL_ASYNC_CONNECTION_STRING is not set.")
return
await bootstrap_schema()
container = configure_container().build()
print("DI container configured and built.")
command_bus = container.resolve(CommandBus)
product_name = "Super Whizbang Gadget"
current_user_id = uuid.uuid4() # Example creator ID
create_cmd = CreateProductCommand(name=product_name, creator_id=current_user_id)
print(f"Executing command: {create_cmd}")
try:
await command_bus.execute(create_cmd)
print("Command executed successfully.")
except Exception as e:
print(f"Error during command execution: {e}")
import traceback
traceback.print_exc()
print("Quickstart finished.")
if __name__ == "__main__":
asyncio.run(run_quickstart())
To run this quickstart:
1. Ensure a .env
file with SQL_ASYNC_CONNECTION_STRING="sqlite+aiosqlite:///./quickstart.db"
.
2. Install python-dotenv
if using load_dotenv()
.
3. Run python main.py
from your_project/
.
You should see output indicating product creation and schema bootstrapping.
Conclusion¶
This quickstart demonstrated the basic flow of defining domain models, commands, handlers, setting up dependency injection, and executing a command using castlecraft-engineer
. From here, you can explore more advanced concepts like Queries, Events, Authorization, and more detailed Repository patterns covered in other sections of the documentation.