Skip to content

Generic Model Repositories

castlecraft-engineer provides generic repository implementations for working directly with SQLModel entities. These are designed for simpler data access patterns, often used within Query Handlers or for entities that are not modeled as full Domain-Driven Design (DDD) Aggregates. This makes them distinct from AggregateRepository (detailed in the Aggregates concept page), which is tailored for managing the persistence of DDD Aggregates and includes logic for optimistic concurrency and event handling related to aggregates.

The primary classes for these simpler model repositories are: * ModelRepository[TModel] for synchronous operations. * AsyncModelRepository[TModel] for asynchronous operations.

These are located in castlecraft_engineer.database.repository (or a similar path depending on recent refactoring, typically alongside other repository abstractions).

Core Features

Both ModelRepository and AsyncModelRepository offer a standard set of Create, Read, Update, and Delete (CRUD) operations for a given SQLModel type (TModel).

1. ModelRepository[TModel] (Synchronous)

This repository works with a synchronous SQLAlchemy/SQLModel Session.

Key Methods:

  • __init__(self, model_cls: Type[TModel])

    • The constructor takes the SQLModel class (model_cls) that this repository instance will manage.
  • get_by_id(self, session: Session, id: TID) -> Optional[TModel]

    • Retrieves a model instance by its primary key (id). TID is a type variable representing the ID's type.
    • Returns None if no model with the given ID is found.
  • get_all(self, session: Session, offset: int = 0, limit: int = 100, **filters: Any) -> List[TModel]

    • Retrieves a list of model instances.
    • offset: Number of records to skip (for pagination).
    • limit: Maximum number of records to return.
    • **filters: Keyword arguments where keys are model attribute names and values are the values to filter by (exact match). For example, name="Product A" would filter for models where the name attribute is "Product A".
  • create(self, session: Session, obj_in: TModel) -> TModel

    • Creates a new model instance in the database.
    • obj_in: An instance of the SQLModel to be created.
    • The session should typically be flushed or committed after this call to persist the changes.
  • update(self, session: Session, db_obj: TModel, obj_in: Union[TModel, Dict[str, Any]]) -> TModel

    • Updates an existing model instance.
    • db_obj: The existing SQLModel instance fetched from the database.
    • obj_in: Either another SQLModel instance or a dictionary containing the fields to update.
    • The session should typically be flushed or committed after this call.
  • delete_by_id(self, session: Session, id: TID) -> Optional[TModel]

    • Deletes a model instance by its ID.
    • Returns the deleted model instance if found and deleted, otherwise None.
    • The session should typically be flushed or committed after this call.

2. AsyncModelRepository[TModel] (Asynchronous)

This repository works with an AsyncSession from SQLAlchemy/SQLModel, making it suitable for asynchronous applications (e.g., using FastAPI). Its methods are asynchronous and mirror the synchronous versions.

Key Methods (Async Counterparts):

  • __init__(self, model_cls: Type[TModel]) (Same as synchronous)
  • async get_by_id(self, session: AsyncSession, id: TID) -> Optional[TModel]
  • async get_all(self, session: AsyncSession, offset: int = 0, limit: int = 100, **filters: Any) -> List[TModel]
  • async create(self, session: AsyncSession, obj_in: TModel) -> TModel
  • async update(self, session: AsyncSession, db_obj: TModel, obj_in: Union[TModel, Dict[str, Any]]) -> TModel
  • async delete_by_id(self, session: AsyncSession, id: TID) -> Optional[TModel]

How to Instantiate and Use

You typically create a specific repository class that inherits from ModelRepository or AsyncModelRepository, specifying your SQLModel type.

Example with AsyncModelRepository

Let's define a simple Category SQLModel and its corresponding repository.

import uuid
from typing import List, Optional, Type, Union, Dict, Any
from sqlmodel import SQLModel, Field # For SQLModel definition
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine # For async example

from castlecraft_engineer.database.repository import AsyncModelRepository
# from castlecraft_engineer.common.di import ContainerBuilder # For DI example later

# 1. Define your SQLModel
class Category(SQLModel, table=True):
    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    name: str = Field(index=True, unique=True)
    description: Optional[str] = None
    is_active: bool = Field(default=True)

# 2. Create a concrete repository class
class CategoryRepository(AsyncModelRepository[Category]):
    def __init__(self): # No session needed in constructor
        super().__init__(model_cls=Category)

    # You can add custom methods here if needed
    async def get_by_name(self, session: AsyncSession, name: str) -> Optional[Category]:
        # The get_all method supports filtering via keyword arguments
        results = await self.get_all(session, name=name, limit=1)
        return results[0] if results else None

# 3. Example Usage (typically in a Query Handler or service)
async def manage_categories(session: AsyncSession, category_repo: CategoryRepository):
    # Create a category
    new_category_data = Category(name="Electronics", description="Gadgets and devices")
    created_category = await category_repo.create(session, new_category_data)
    await session.commit() # Commit after creation
    print(f"Created category: {created_category.name} (ID: {created_category.id})")

    # Get a category by ID
    fetched_by_id = await category_repo.get_by_id(session, created_category.id)
    if fetched_by_id:
        print(f"Fetched by ID: {fetched_by_id.name}")

    # Get a category by custom method (name)
    fetched_by_name = await category_repo.get_by_name(session, "Electronics")
    if fetched_by_name:
        print(f"Fetched by name: {fetched_by_name.name}")

    # Update a category
    if fetched_by_id:
        updated_data = {"description": "All kinds of electronic gadgets and devices"}
        updated_category = await category_repo.update(session, fetched_by_id, updated_data)
        await session.commit() # Commit after update
        print(f"Updated category description: {updated_category.description}")

    # Get all active categories with filtering and pagination
    all_active_categories = await category_repo.get_all(session, offset=0, limit=10, is_active=True)
    print(f"Active categories ({len(all_active_categories)} found):")
    for cat in all_active_categories:
        print(f"- {cat.name}")

    # Delete a category
    # deleted_category = await category_repo.delete_by_id(session, created_category.id)
    # await session.commit() # Commit after deletion
    # if deleted_category:
    #     print(f"Deleted category: {deleted_category.name}")

# --- Example DI Setup and Execution (Simplified) ---
async def run_async_example():
    # In a real app, use ContainerBuilder and get session from DI
    DATABASE_URL = "sqlite+aiosqlite:///./test_categories.db" # Example URL
    engine = create_async_engine(DATABASE_URL, echo=False)

    # Create tables (usually done once via bootstrap or migrations)
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

    # Define a session factory for creating sessions
    from sqlalchemy.ext.asyncio import async_sessionmaker
    async_session_local_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

    repo_instance = CategoryRepository() # Instantiate repository

    async with async_session_local_factory() as session:
        await manage_categories(session, repo_instance)

    await engine.dispose()

# To run the example (if you put this in a runnable script):
# import asyncio
# if __name__ == "__main__":
#     asyncio.run(run_async_example())

Dependency Injection: In a typical application using castlecraft-engineer, you would register your concrete repository (e.g., CategoryRepository) with the DI container. The AsyncSession (or Session) would also be registered (usually as a factory) and injected into your handlers or services, which then pass it to the repository methods.

# Example snippet for di_setup.py
# from .your_repositories_module import CategoryRepository
# from castlecraft_engineer.common.di import ContainerBuilder

# def configure_container(builder: ContainerBuilder):
#     # ... other registrations ...
#     builder.with_async_database() # Registers AsyncSession factory
#     builder.register(CategoryRepository) # Registers your specific repository
#     # ... other registrations ...

Use Cases

ModelRepository and AsyncModelRepository are best suited for:

  • Read Operations in Query Handlers: When you need to fetch data for display or reporting without complex domain logic or consistency boundaries of an Aggregate.
  • Managing Simpler Entities: For entities that don't have complex invariants or don't act as Aggregate Roots, direct model manipulation via these repositories can be simpler.
  • Supporting Data for Aggregates: Sometimes, you might need to fetch lookup data or related simple entities that are not part of an Aggregate's direct consistency boundary.

Singleton-like Configurations (database/settings_storage.py)

While not directly part of the generic ModelRepository, castlecraft-engineer also provides utilities for managing singleton-like configurations or settings stored in the database. This is facilitated by castlecraft_engineer.database.settings_storage.py which includes the Singles model and the synchronous SettingsStorage class.

This SettingsStorage is specialized for key-value like storage of individual settings or simple configuration objects. It can be useful for storing application-wide parameters that need to be persisted and are not complex enough to warrant a full entity or aggregate model.

For example, you might store an API key or a feature flag status using the synchronous SettingsStorage:

# from castlecraft_engineer.database.settings_storage import SettingsStorage
# from sqlalchemy.orm import Session # For synchronous session

# def example_settings_usage(session: Session, settings_storage: SettingsStorage):
#     settings_storage.set_value(session, "EXTERNAL_API_KEY", "your_secret_key_here")
#     api_key = settings_storage.get_value(session, "EXTERNAL_API_KEY")
#     if api_key:
#         print(f"Retrieved API Key: {api_key}")
#     session.commit()
The SettingsStorage class offers get_value, set_value, get_typed_value, and set_typed_value methods for convenience.

Conclusion

The generic ModelRepository and AsyncModelRepository provide a convenient and standardized way to perform CRUD operations on your SQLModel entities. They are a valuable tool for simpler data access needs, complementing the more domain-focused AggregateRepository for managing complex Aggregates.