Skip to content

castlecraft_engineer.common.di

castlecraft_engineer.common.di

ContainerBuilder

Builds the DI container progressively.

Source code in src/castlecraft_engineer/common/di.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
class ContainerBuilder:
    """
    Builds the DI container progressively.
    """

    def __init__(self) -> None:
        self._logger = logger
        self._container = punq.Container()
        self._db_registered = False
        self._async_db_registered = False
        self._cache_registered = False
        self._async_cache_registered = False
        self._authentication_registered = False
        self._command_bus_registered = False
        self._query_bus_registered = False
        self._event_bus_registered = False
        self._authorization_registered = False
        self.command_bus: Optional["CommandBus"] = None
        self.query_bus: Optional["QueryBus"] = None
        self.event_bus: Optional["EventBus"] = None
        self._container.register(punq.Container, instance=self._container)
        self._logger.info("Initialized ContainerBuilder.")

    def with_database(self) -> "ContainerBuilder":
        """
        Registers database connection and components
        """
        if self._db_registered:
            self._logger.warning(
                "Database components already registered. Skipping.",
            )
            return self

        self._logger.info("Registering synchronous database components...")

        try:
            from sqlalchemy import Engine
            from sqlalchemy.orm import Session, sessionmaker

            from castlecraft_engineer.database.connection import (
                SyncSessionFactory,
                get_engine,
            )

            sync_engine = get_engine()
            self._container.register(
                Engine,
                instance=sync_engine,
                name="db_sync_engine",
            )

            self._container.register(
                sessionmaker[Session],
                instance=SyncSessionFactory,
                name="db_sync_session_factory",
            )

            self._container.register(
                Session,
                factory=lambda: SyncSessionFactory(),
            )

            self._db_registered = True
            self._logger.info("Synchronous database components registered.")
        except Exception as e:
            self._logger.error(
                f"Failed to register synchronous database components: {e}",
                exc_info=True,
            )

        return self

    def with_async_database(self) -> "ContainerBuilder":
        """
        Registers asynchronous database
        connection and components.
        """
        if self._async_db_registered:
            self._logger.warning(
                "Asynchronous database components already registered. Skipping.",  # noqa: E501
            )
            return self

        self._logger.info("Registering asynchronous database components...")

        try:
            from sqlalchemy.ext.asyncio import (
                AsyncEngine,
                AsyncSession,
                async_sessionmaker,
            )

            from castlecraft_engineer.database.connection import (
                AsyncSessionFactory,
                get_async_engine,
            )

            async_engine = get_async_engine()
            self._container.register(
                AsyncEngine, instance=async_engine, name="db_async_engine"
            )

            self._container.register(
                async_sessionmaker[AsyncSession],
                instance=AsyncSessionFactory,
                name="db_async_session_factory",
            )

            self._container.register(
                AsyncSession, factory=lambda: AsyncSessionFactory()
            )

            self._async_db_registered = True
            self._logger.info("Asynchronous database components registered.")
        except Exception as e:
            self._logger.error(
                f"Failed to register asynchronous database components: {e}",
                exc_info=True,
            )

        return self

    def with_cache(self, is_async: bool = False) -> "ContainerBuilder":
        """
        Registers Cache connection and components.

        Args:
            is_async: If True, registers the asynchronous Redis client.
                      If False (default), registers the synchronous client.
        """
        if is_async and self._async_cache_registered:
            self._logger.warning(
                "Asynchronous cache components already registered. Skipping.",
            )
            return self
        if not is_async and self._cache_registered:
            self._logger.warning(
                "Synchronous cache components already registered. Skipping.",
            )
            return self

        if is_async:
            self._logger.info(
                "Registering asynchronous cache components...",
            )

            try:
                import redis.asyncio as aredis

                from castlecraft_engineer.caching.cache import (
                    get_redis_cache_async_connection,
                )

                # Safer: Register a factory
                self._container.register(
                    aredis.Redis,
                    factory=lambda **_: get_redis_cache_async_connection(),
                    scope=punq.Scope.singleton,
                    name="cache_async",
                )
                self._async_cache_registered = True
                self._logger.info(
                    "Asynchronous cache components registered (factory).",
                )
            except Exception as e:
                self._logger.error(
                    f"Failed to register asynchronous cache components: {e}",
                    exc_info=True,
                )
        else:
            self._logger.info("Registering synchronous cache components...")
            try:
                import redis

                from castlecraft_engineer.caching.cache import (
                    get_redis_cache_connection,
                )

                sync_cache_client = get_redis_cache_connection()
                self._container.register(
                    redis.Redis, instance=sync_cache_client, name="cache_sync"
                )
                self._cache_registered = True
                self._logger.info("Synchronous cache components registered.")
            except Exception as e:
                self._logger.error(
                    f"Failed to register synchronous cache components: {e}",
                    exc_info=True,
                )

        return self

    def with_authentication(self) -> "ContainerBuilder":
        """
        Registers Authentication connection and components
        """
        if self._authentication_registered:
            self._logger.warning(
                "Authentication components already registered. Skipping.",
            )
            return self

        self._logger.info(
            "Registering Authentication components (AuthenticationService)...",
        )

        try:
            from castlecraft_engineer.application.auth import AuthenticationService

            # Prefer async if registered, otherwise use sync if registered
            def auth_service_factory(
                container=self._container,
            ):
                sync_cache = None
                async_cache = None
                if self._async_cache_registered:
                    try:
                        import redis.asyncio as aredis

                        # Resolve the async client (trigger factory)
                        async_cache = container.resolve(
                            aredis.Redis,
                            name="cache_async",
                        )
                        self._logger.info(
                            "AuthenticationService will use asynchronous cache.",
                        )
                    except ImportError:
                        self._logger.info(
                            "aredis library not found for auth_service_factory. Async cache will not be used."  # noqa: E501
                        )
                    except Exception as e:
                        self._logger.error(
                            f"Failed to resolve async cache for AuthenticationService: {e}"
                        )
                if not async_cache and self._cache_registered:
                    try:
                        import redis

                        sync_cache = container.resolve(
                            redis.Redis,
                            name="cache_sync",
                        )
                        self._logger.info(
                            "AuthenticationService will use synchronous cache.",
                        )
                    except ImportError:
                        self._logger.info(
                            "redis library not found for auth_service_factory. Sync cache will not be used."  # noqa: E501
                        )
                    except Exception as e:
                        self._logger.error(
                            f"Failed to resolve sync cache for AuthenticationService: {e}"
                        )

                return AuthenticationService(
                    cache_client=sync_cache, async_cache_client=async_cache
                )

            self._container.register(
                AuthenticationService,
                factory=auth_service_factory,
                scope=punq.Scope.singleton,
            )
            self._authentication_registered = True
            self._logger.info("Authentication components registered.")
        except Exception as e:
            self._logger.error(
                f"Failed to register Authentication components: {e}",
                exc_info=True,
            )

        return self

    def with_command_bus(self) -> "ContainerBuilder":
        """Registers the CommandBus as a singleton."""
        if self._command_bus_registered:
            self._logger.warning("CommandBus already registered. Skipping.")
            return self

        self._logger.info("Registering CommandBus...")
        try:
            from castlecraft_engineer.abstractions.command_bus import CommandBus

            self._container.register(
                CommandBus,
                factory=lambda c=self._container: CommandBus(container=c),
                scope=punq.Scope.singleton,
            )
            self._command_bus_registered = True
            self.command_bus = self._container.resolve(CommandBus)
            self._logger.info("CommandBus registered as singleton.")
        except Exception as e:
            self._logger.error(f"Failed to register CommandBus: {e}", exc_info=True)
        return self

    def with_query_bus(self) -> "ContainerBuilder":
        """Registers the QueryBus as a singleton."""
        if self._query_bus_registered:
            self._logger.warning("QueryBus already registered. Skipping.")
            return self

        self._logger.info("Registering QueryBus...")
        try:
            from castlecraft_engineer.abstractions.query_bus import QueryBus

            self._container.register(
                QueryBus,
                factory=lambda c=self._container: QueryBus(container=c),
                scope=punq.Scope.singleton,
            )
            self._query_bus_registered = True
            self.query_bus = self._container.resolve(QueryBus)
            self._logger.info("QueryBus registered as singleton.")
        except Exception as e:
            self._logger.error(f"Failed to register QueryBus: {e}", exc_info=True)
        return self

    def with_event_bus(self) -> "ContainerBuilder":
        """
        Registers the EventBus as a singleton.
        Note: Event handlers are typically registered directly with the
        EventBus instance after it's resolved, not via the DI container
        for the handlers themselves unless the EventBus is modified to
        resolve handlers.
        """
        if self._event_bus_registered:
            self._logger.warning("EventBus already registered. Skipping.")
            return self

        self._logger.info("Registering EventBus...")
        try:
            from castlecraft_engineer.abstractions.event_bus import EventBus

            self._container.register(
                EventBus,
                factory=lambda c=self._container: EventBus(container=c),
                scope=punq.Scope.singleton,
            )
            self._event_bus_registered = True
            self.event_bus = self._container.resolve(EventBus)
            self._logger.info("EventBus registered as singleton.")
        except Exception as e:
            self._logger.error(f"Failed to register EventBus: {e}", exc_info=True)
        return self

    def with_authorization(self) -> "ContainerBuilder":
        """
        Registers Authorization connection and components
        """

        if not self._authentication_registered:
            self._logger.error(
                "Authentication components need to be registered. Skipping.",
            )
            return self

        if self._authorization_registered:
            self._logger.warning(
                "Authorization components already registered. Skipping.",
            )
            return self

        self._logger.info(
            "Setting up and registering Authorization components...",
        )

        try:
            from castlecraft_engineer.authorization.setup import setup_authorization

            setup_authorization(self._container)
            self._authorization_registered = True
            self._logger.info(
                "Authorization components set up and registered.",
            )
        except Exception as e:
            self._logger.error(
                f"Failed to set up authorization components: {e}",
                exc_info=True,
            )

        return self

    def register(
        self,
        type_or_name: Any,
        **kwargs,
    ) -> "ContainerBuilder":
        """Directly register a component."""
        self._container.register(type_or_name, **kwargs)
        return self

    def build(self) -> punq.Container:
        """Returns the configured container."""
        self._logger.info("DI container build complete.")
        return self._container

build()

Returns the configured container.

Source code in src/castlecraft_engineer/common/di.py
def build(self) -> punq.Container:
    """Returns the configured container."""
    self._logger.info("DI container build complete.")
    return self._container

register(type_or_name, **kwargs)

Directly register a component.

Source code in src/castlecraft_engineer/common/di.py
def register(
    self,
    type_or_name: Any,
    **kwargs,
) -> "ContainerBuilder":
    """Directly register a component."""
    self._container.register(type_or_name, **kwargs)
    return self

with_async_database()

Registers asynchronous database connection and components.

Source code in src/castlecraft_engineer/common/di.py
def with_async_database(self) -> "ContainerBuilder":
    """
    Registers asynchronous database
    connection and components.
    """
    if self._async_db_registered:
        self._logger.warning(
            "Asynchronous database components already registered. Skipping.",  # noqa: E501
        )
        return self

    self._logger.info("Registering asynchronous database components...")

    try:
        from sqlalchemy.ext.asyncio import (
            AsyncEngine,
            AsyncSession,
            async_sessionmaker,
        )

        from castlecraft_engineer.database.connection import (
            AsyncSessionFactory,
            get_async_engine,
        )

        async_engine = get_async_engine()
        self._container.register(
            AsyncEngine, instance=async_engine, name="db_async_engine"
        )

        self._container.register(
            async_sessionmaker[AsyncSession],
            instance=AsyncSessionFactory,
            name="db_async_session_factory",
        )

        self._container.register(
            AsyncSession, factory=lambda: AsyncSessionFactory()
        )

        self._async_db_registered = True
        self._logger.info("Asynchronous database components registered.")
    except Exception as e:
        self._logger.error(
            f"Failed to register asynchronous database components: {e}",
            exc_info=True,
        )

    return self

with_authentication()

Registers Authentication connection and components

Source code in src/castlecraft_engineer/common/di.py
def with_authentication(self) -> "ContainerBuilder":
    """
    Registers Authentication connection and components
    """
    if self._authentication_registered:
        self._logger.warning(
            "Authentication components already registered. Skipping.",
        )
        return self

    self._logger.info(
        "Registering Authentication components (AuthenticationService)...",
    )

    try:
        from castlecraft_engineer.application.auth import AuthenticationService

        # Prefer async if registered, otherwise use sync if registered
        def auth_service_factory(
            container=self._container,
        ):
            sync_cache = None
            async_cache = None
            if self._async_cache_registered:
                try:
                    import redis.asyncio as aredis

                    # Resolve the async client (trigger factory)
                    async_cache = container.resolve(
                        aredis.Redis,
                        name="cache_async",
                    )
                    self._logger.info(
                        "AuthenticationService will use asynchronous cache.",
                    )
                except ImportError:
                    self._logger.info(
                        "aredis library not found for auth_service_factory. Async cache will not be used."  # noqa: E501
                    )
                except Exception as e:
                    self._logger.error(
                        f"Failed to resolve async cache for AuthenticationService: {e}"
                    )
            if not async_cache and self._cache_registered:
                try:
                    import redis

                    sync_cache = container.resolve(
                        redis.Redis,
                        name="cache_sync",
                    )
                    self._logger.info(
                        "AuthenticationService will use synchronous cache.",
                    )
                except ImportError:
                    self._logger.info(
                        "redis library not found for auth_service_factory. Sync cache will not be used."  # noqa: E501
                    )
                except Exception as e:
                    self._logger.error(
                        f"Failed to resolve sync cache for AuthenticationService: {e}"
                    )

            return AuthenticationService(
                cache_client=sync_cache, async_cache_client=async_cache
            )

        self._container.register(
            AuthenticationService,
            factory=auth_service_factory,
            scope=punq.Scope.singleton,
        )
        self._authentication_registered = True
        self._logger.info("Authentication components registered.")
    except Exception as e:
        self._logger.error(
            f"Failed to register Authentication components: {e}",
            exc_info=True,
        )

    return self

with_authorization()

Registers Authorization connection and components

Source code in src/castlecraft_engineer/common/di.py
def with_authorization(self) -> "ContainerBuilder":
    """
    Registers Authorization connection and components
    """

    if not self._authentication_registered:
        self._logger.error(
            "Authentication components need to be registered. Skipping.",
        )
        return self

    if self._authorization_registered:
        self._logger.warning(
            "Authorization components already registered. Skipping.",
        )
        return self

    self._logger.info(
        "Setting up and registering Authorization components...",
    )

    try:
        from castlecraft_engineer.authorization.setup import setup_authorization

        setup_authorization(self._container)
        self._authorization_registered = True
        self._logger.info(
            "Authorization components set up and registered.",
        )
    except Exception as e:
        self._logger.error(
            f"Failed to set up authorization components: {e}",
            exc_info=True,
        )

    return self

with_cache(is_async=False)

Registers Cache connection and components.

Parameters:

Name Type Description Default
is_async bool

If True, registers the asynchronous Redis client. If False (default), registers the synchronous client.

False
Source code in src/castlecraft_engineer/common/di.py
def with_cache(self, is_async: bool = False) -> "ContainerBuilder":
    """
    Registers Cache connection and components.

    Args:
        is_async: If True, registers the asynchronous Redis client.
                  If False (default), registers the synchronous client.
    """
    if is_async and self._async_cache_registered:
        self._logger.warning(
            "Asynchronous cache components already registered. Skipping.",
        )
        return self
    if not is_async and self._cache_registered:
        self._logger.warning(
            "Synchronous cache components already registered. Skipping.",
        )
        return self

    if is_async:
        self._logger.info(
            "Registering asynchronous cache components...",
        )

        try:
            import redis.asyncio as aredis

            from castlecraft_engineer.caching.cache import (
                get_redis_cache_async_connection,
            )

            # Safer: Register a factory
            self._container.register(
                aredis.Redis,
                factory=lambda **_: get_redis_cache_async_connection(),
                scope=punq.Scope.singleton,
                name="cache_async",
            )
            self._async_cache_registered = True
            self._logger.info(
                "Asynchronous cache components registered (factory).",
            )
        except Exception as e:
            self._logger.error(
                f"Failed to register asynchronous cache components: {e}",
                exc_info=True,
            )
    else:
        self._logger.info("Registering synchronous cache components...")
        try:
            import redis

            from castlecraft_engineer.caching.cache import (
                get_redis_cache_connection,
            )

            sync_cache_client = get_redis_cache_connection()
            self._container.register(
                redis.Redis, instance=sync_cache_client, name="cache_sync"
            )
            self._cache_registered = True
            self._logger.info("Synchronous cache components registered.")
        except Exception as e:
            self._logger.error(
                f"Failed to register synchronous cache components: {e}",
                exc_info=True,
            )

    return self

with_command_bus()

Registers the CommandBus as a singleton.

Source code in src/castlecraft_engineer/common/di.py
def with_command_bus(self) -> "ContainerBuilder":
    """Registers the CommandBus as a singleton."""
    if self._command_bus_registered:
        self._logger.warning("CommandBus already registered. Skipping.")
        return self

    self._logger.info("Registering CommandBus...")
    try:
        from castlecraft_engineer.abstractions.command_bus import CommandBus

        self._container.register(
            CommandBus,
            factory=lambda c=self._container: CommandBus(container=c),
            scope=punq.Scope.singleton,
        )
        self._command_bus_registered = True
        self.command_bus = self._container.resolve(CommandBus)
        self._logger.info("CommandBus registered as singleton.")
    except Exception as e:
        self._logger.error(f"Failed to register CommandBus: {e}", exc_info=True)
    return self

with_database()

Registers database connection and components

Source code in src/castlecraft_engineer/common/di.py
def with_database(self) -> "ContainerBuilder":
    """
    Registers database connection and components
    """
    if self._db_registered:
        self._logger.warning(
            "Database components already registered. Skipping.",
        )
        return self

    self._logger.info("Registering synchronous database components...")

    try:
        from sqlalchemy import Engine
        from sqlalchemy.orm import Session, sessionmaker

        from castlecraft_engineer.database.connection import (
            SyncSessionFactory,
            get_engine,
        )

        sync_engine = get_engine()
        self._container.register(
            Engine,
            instance=sync_engine,
            name="db_sync_engine",
        )

        self._container.register(
            sessionmaker[Session],
            instance=SyncSessionFactory,
            name="db_sync_session_factory",
        )

        self._container.register(
            Session,
            factory=lambda: SyncSessionFactory(),
        )

        self._db_registered = True
        self._logger.info("Synchronous database components registered.")
    except Exception as e:
        self._logger.error(
            f"Failed to register synchronous database components: {e}",
            exc_info=True,
        )

    return self

with_event_bus()

Registers the EventBus as a singleton. Note: Event handlers are typically registered directly with the EventBus instance after it's resolved, not via the DI container for the handlers themselves unless the EventBus is modified to resolve handlers.

Source code in src/castlecraft_engineer/common/di.py
def with_event_bus(self) -> "ContainerBuilder":
    """
    Registers the EventBus as a singleton.
    Note: Event handlers are typically registered directly with the
    EventBus instance after it's resolved, not via the DI container
    for the handlers themselves unless the EventBus is modified to
    resolve handlers.
    """
    if self._event_bus_registered:
        self._logger.warning("EventBus already registered. Skipping.")
        return self

    self._logger.info("Registering EventBus...")
    try:
        from castlecraft_engineer.abstractions.event_bus import EventBus

        self._container.register(
            EventBus,
            factory=lambda c=self._container: EventBus(container=c),
            scope=punq.Scope.singleton,
        )
        self._event_bus_registered = True
        self.event_bus = self._container.resolve(EventBus)
        self._logger.info("EventBus registered as singleton.")
    except Exception as e:
        self._logger.error(f"Failed to register EventBus: {e}", exc_info=True)
    return self

with_query_bus()

Registers the QueryBus as a singleton.

Source code in src/castlecraft_engineer/common/di.py
def with_query_bus(self) -> "ContainerBuilder":
    """Registers the QueryBus as a singleton."""
    if self._query_bus_registered:
        self._logger.warning("QueryBus already registered. Skipping.")
        return self

    self._logger.info("Registering QueryBus...")
    try:
        from castlecraft_engineer.abstractions.query_bus import QueryBus

        self._container.register(
            QueryBus,
            factory=lambda c=self._container: QueryBus(container=c),
            scope=punq.Scope.singleton,
        )
        self._query_bus_registered = True
        self.query_bus = self._container.resolve(QueryBus)
        self._logger.info("QueryBus registered as singleton.")
    except Exception as e:
        self._logger.error(f"Failed to register QueryBus: {e}", exc_info=True)
    return self