Skip to content

castlecraft_engineer.common

castlecraft_engineer.common

ContainerBuilder

Builds the DI container progressively.

Source code in src/castlecraft_engineer/common/di.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
class ContainerBuilder:
    """
    Builds the DI container progressively.
    """

    def __init__(self) -> None:
        self._logger = logger
        self._container = punq.Container()
        self._db_registered = False
        self._async_db_registered = False
        self._cache_registered = False
        self._async_cache_registered = False
        self._authentication_registered = False
        self._command_bus_registered = False
        self._query_bus_registered = False
        self._event_bus_registered = False
        self._authorization_registered = False
        self.command_bus: Optional["CommandBus"] = None
        self.query_bus: Optional["QueryBus"] = None
        self.event_bus: Optional["EventBus"] = None
        self._container.register(punq.Container, instance=self._container)
        self._logger.info("Initialized ContainerBuilder.")

    def with_database(self) -> "ContainerBuilder":
        """
        Registers database connection and components
        """
        if self._db_registered:
            self._logger.warning(
                "Database components already registered. Skipping.",
            )
            return self

        self._logger.info("Registering synchronous database components...")

        try:
            from sqlalchemy import Engine
            from sqlalchemy.orm import Session, sessionmaker

            from castlecraft_engineer.database.connection import (
                SyncSessionFactory,
                get_engine,
            )

            sync_engine = get_engine()
            self._container.register(
                Engine,
                instance=sync_engine,
                name="db_sync_engine",
            )

            self._container.register(
                sessionmaker[Session],
                instance=SyncSessionFactory,
                name="db_sync_session_factory",
            )

            self._container.register(
                Session,
                factory=lambda: SyncSessionFactory(),
            )

            self._db_registered = True
            self._logger.info("Synchronous database components registered.")
        except Exception as e:
            self._logger.error(
                f"Failed to register synchronous database components: {e}",
                exc_info=True,
            )

        return self

    def with_async_database(self) -> "ContainerBuilder":
        """
        Registers asynchronous database
        connection and components.
        """
        if self._async_db_registered:
            self._logger.warning(
                "Asynchronous database components already registered. Skipping.",  # noqa: E501
            )
            return self

        self._logger.info("Registering asynchronous database components...")

        try:
            from sqlalchemy.ext.asyncio import (
                AsyncEngine,
                AsyncSession,
                async_sessionmaker,
            )

            from castlecraft_engineer.database.connection import (
                AsyncSessionFactory,
                get_async_engine,
            )

            async_engine = get_async_engine()
            self._container.register(
                AsyncEngine, instance=async_engine, name="db_async_engine"
            )

            self._container.register(
                async_sessionmaker[AsyncSession],
                instance=AsyncSessionFactory,
                name="db_async_session_factory",
            )

            self._container.register(
                AsyncSession, factory=lambda: AsyncSessionFactory()
            )

            self._async_db_registered = True
            self._logger.info("Asynchronous database components registered.")
        except Exception as e:
            self._logger.error(
                f"Failed to register asynchronous database components: {e}",
                exc_info=True,
            )

        return self

    def with_cache(self, is_async: bool = False) -> "ContainerBuilder":
        """
        Registers Cache connection and components.

        Args:
            is_async: If True, registers the asynchronous Redis client.
                      If False (default), registers the synchronous client.
        """
        if is_async and self._async_cache_registered:
            self._logger.warning(
                "Asynchronous cache components already registered. Skipping.",
            )
            return self
        if not is_async and self._cache_registered:
            self._logger.warning(
                "Synchronous cache components already registered. Skipping.",
            )
            return self

        if is_async:
            self._logger.info(
                "Registering asynchronous cache components...",
            )

            try:
                import redis.asyncio as aredis

                from castlecraft_engineer.caching.cache import (
                    get_redis_cache_async_connection,
                )

                # Safer: Register a factory
                self._container.register(
                    aredis.Redis,
                    factory=lambda **_: get_redis_cache_async_connection(),
                    scope=punq.Scope.singleton,
                    name="cache_async",
                )
                self._async_cache_registered = True
                self._logger.info(
                    "Asynchronous cache components registered (factory).",
                )
            except Exception as e:
                self._logger.error(
                    f"Failed to register asynchronous cache components: {e}",
                    exc_info=True,
                )
        else:
            self._logger.info("Registering synchronous cache components...")
            try:
                import redis

                from castlecraft_engineer.caching.cache import (
                    get_redis_cache_connection,
                )

                sync_cache_client = get_redis_cache_connection()
                self._container.register(
                    redis.Redis, instance=sync_cache_client, name="cache_sync"
                )
                self._cache_registered = True
                self._logger.info("Synchronous cache components registered.")
            except Exception as e:
                self._logger.error(
                    f"Failed to register synchronous cache components: {e}",
                    exc_info=True,
                )

        return self

    def with_authentication(self) -> "ContainerBuilder":
        """
        Registers Authentication connection and components
        """
        if self._authentication_registered:
            self._logger.warning(
                "Authentication components already registered. Skipping.",
            )
            return self

        self._logger.info(
            "Registering Authentication components (AuthenticationService)...",
        )

        try:
            from castlecraft_engineer.application.auth import AuthenticationService

            # Prefer async if registered, otherwise use sync if registered
            def auth_service_factory(
                container=self._container,
            ):
                sync_cache = None
                async_cache = None
                if self._async_cache_registered:
                    try:
                        import redis.asyncio as aredis

                        # Resolve the async client (trigger factory)
                        async_cache = container.resolve(
                            aredis.Redis,
                            name="cache_async",
                        )
                        self._logger.info(
                            "AuthenticationService will use asynchronous cache.",
                        )
                    except ImportError:
                        self._logger.info(
                            "aredis library not found for auth_service_factory. Async cache will not be used."  # noqa: E501
                        )
                    except Exception as e:
                        self._logger.error(
                            f"Failed to resolve async cache for AuthenticationService: {e}"
                        )
                if not async_cache and self._cache_registered:
                    try:
                        import redis

                        sync_cache = container.resolve(
                            redis.Redis,
                            name="cache_sync",
                        )
                        self._logger.info(
                            "AuthenticationService will use synchronous cache.",
                        )
                    except ImportError:
                        self._logger.info(
                            "redis library not found for auth_service_factory. Sync cache will not be used."  # noqa: E501
                        )
                    except Exception as e:
                        self._logger.error(
                            f"Failed to resolve sync cache for AuthenticationService: {e}"
                        )

                return AuthenticationService(
                    cache_client=sync_cache, async_cache_client=async_cache
                )

            self._container.register(
                AuthenticationService,
                factory=auth_service_factory,
                scope=punq.Scope.singleton,
            )
            self._authentication_registered = True
            self._logger.info("Authentication components registered.")
        except Exception as e:
            self._logger.error(
                f"Failed to register Authentication components: {e}",
                exc_info=True,
            )

        return self

    def with_command_bus(self) -> "ContainerBuilder":
        """Registers the CommandBus as a singleton."""
        if self._command_bus_registered:
            self._logger.warning("CommandBus already registered. Skipping.")
            return self

        self._logger.info("Registering CommandBus...")
        try:
            from castlecraft_engineer.abstractions.command_bus import CommandBus

            self._container.register(
                CommandBus,
                factory=lambda c=self._container: CommandBus(container=c),
                scope=punq.Scope.singleton,
            )
            self._command_bus_registered = True
            self.command_bus = self._container.resolve(CommandBus)
            self._logger.info("CommandBus registered as singleton.")
        except Exception as e:
            self._logger.error(f"Failed to register CommandBus: {e}", exc_info=True)
        return self

    def with_query_bus(self) -> "ContainerBuilder":
        """Registers the QueryBus as a singleton."""
        if self._query_bus_registered:
            self._logger.warning("QueryBus already registered. Skipping.")
            return self

        self._logger.info("Registering QueryBus...")
        try:
            from castlecraft_engineer.abstractions.query_bus import QueryBus

            self._container.register(
                QueryBus,
                factory=lambda c=self._container: QueryBus(container=c),
                scope=punq.Scope.singleton,
            )
            self._query_bus_registered = True
            self.query_bus = self._container.resolve(QueryBus)
            self._logger.info("QueryBus registered as singleton.")
        except Exception as e:
            self._logger.error(f"Failed to register QueryBus: {e}", exc_info=True)
        return self

    def with_event_bus(self) -> "ContainerBuilder":
        """
        Registers the EventBus as a singleton.
        Note: Event handlers are typically registered directly with the
        EventBus instance after it's resolved, not via the DI container
        for the handlers themselves unless the EventBus is modified to
        resolve handlers.
        """
        if self._event_bus_registered:
            self._logger.warning("EventBus already registered. Skipping.")
            return self

        self._logger.info("Registering EventBus...")
        try:
            from castlecraft_engineer.abstractions.event_bus import EventBus

            self._container.register(
                EventBus,
                factory=lambda c=self._container: EventBus(container=c),
                scope=punq.Scope.singleton,
            )
            self._event_bus_registered = True
            self.event_bus = self._container.resolve(EventBus)
            self._logger.info("EventBus registered as singleton.")
        except Exception as e:
            self._logger.error(f"Failed to register EventBus: {e}", exc_info=True)
        return self

    def with_authorization(self) -> "ContainerBuilder":
        """
        Registers Authorization connection and components
        """

        if not self._authentication_registered:
            self._logger.error(
                "Authentication components need to be registered. Skipping.",
            )
            return self

        if self._authorization_registered:
            self._logger.warning(
                "Authorization components already registered. Skipping.",
            )
            return self

        self._logger.info(
            "Setting up and registering Authorization components...",
        )

        try:
            from castlecraft_engineer.authorization.setup import setup_authorization

            setup_authorization(self._container)
            self._authorization_registered = True
            self._logger.info(
                "Authorization components set up and registered.",
            )
        except Exception as e:
            self._logger.error(
                f"Failed to set up authorization components: {e}",
                exc_info=True,
            )

        return self

    def register(
        self,
        type_or_name: Any,
        **kwargs,
    ) -> "ContainerBuilder":
        """Directly register a component."""
        self._container.register(type_or_name, **kwargs)
        return self

    def build(self) -> punq.Container:
        """Returns the configured container."""
        self._logger.info("DI container build complete.")
        return self._container

build()

Returns the configured container.

Source code in src/castlecraft_engineer/common/di.py
def build(self) -> punq.Container:
    """Returns the configured container."""
    self._logger.info("DI container build complete.")
    return self._container

register(type_or_name, **kwargs)

Directly register a component.

Source code in src/castlecraft_engineer/common/di.py
def register(
    self,
    type_or_name: Any,
    **kwargs,
) -> "ContainerBuilder":
    """Directly register a component."""
    self._container.register(type_or_name, **kwargs)
    return self

with_async_database()

Registers asynchronous database connection and components.

Source code in src/castlecraft_engineer/common/di.py
def with_async_database(self) -> "ContainerBuilder":
    """
    Registers asynchronous database
    connection and components.
    """
    if self._async_db_registered:
        self._logger.warning(
            "Asynchronous database components already registered. Skipping.",  # noqa: E501
        )
        return self

    self._logger.info("Registering asynchronous database components...")

    try:
        from sqlalchemy.ext.asyncio import (
            AsyncEngine,
            AsyncSession,
            async_sessionmaker,
        )

        from castlecraft_engineer.database.connection import (
            AsyncSessionFactory,
            get_async_engine,
        )

        async_engine = get_async_engine()
        self._container.register(
            AsyncEngine, instance=async_engine, name="db_async_engine"
        )

        self._container.register(
            async_sessionmaker[AsyncSession],
            instance=AsyncSessionFactory,
            name="db_async_session_factory",
        )

        self._container.register(
            AsyncSession, factory=lambda: AsyncSessionFactory()
        )

        self._async_db_registered = True
        self._logger.info("Asynchronous database components registered.")
    except Exception as e:
        self._logger.error(
            f"Failed to register asynchronous database components: {e}",
            exc_info=True,
        )

    return self

with_authentication()

Registers Authentication connection and components

Source code in src/castlecraft_engineer/common/di.py
def with_authentication(self) -> "ContainerBuilder":
    """
    Registers Authentication connection and components
    """
    if self._authentication_registered:
        self._logger.warning(
            "Authentication components already registered. Skipping.",
        )
        return self

    self._logger.info(
        "Registering Authentication components (AuthenticationService)...",
    )

    try:
        from castlecraft_engineer.application.auth import AuthenticationService

        # Prefer async if registered, otherwise use sync if registered
        def auth_service_factory(
            container=self._container,
        ):
            sync_cache = None
            async_cache = None
            if self._async_cache_registered:
                try:
                    import redis.asyncio as aredis

                    # Resolve the async client (trigger factory)
                    async_cache = container.resolve(
                        aredis.Redis,
                        name="cache_async",
                    )
                    self._logger.info(
                        "AuthenticationService will use asynchronous cache.",
                    )
                except ImportError:
                    self._logger.info(
                        "aredis library not found for auth_service_factory. Async cache will not be used."  # noqa: E501
                    )
                except Exception as e:
                    self._logger.error(
                        f"Failed to resolve async cache for AuthenticationService: {e}"
                    )
            if not async_cache and self._cache_registered:
                try:
                    import redis

                    sync_cache = container.resolve(
                        redis.Redis,
                        name="cache_sync",
                    )
                    self._logger.info(
                        "AuthenticationService will use synchronous cache.",
                    )
                except ImportError:
                    self._logger.info(
                        "redis library not found for auth_service_factory. Sync cache will not be used."  # noqa: E501
                    )
                except Exception as e:
                    self._logger.error(
                        f"Failed to resolve sync cache for AuthenticationService: {e}"
                    )

            return AuthenticationService(
                cache_client=sync_cache, async_cache_client=async_cache
            )

        self._container.register(
            AuthenticationService,
            factory=auth_service_factory,
            scope=punq.Scope.singleton,
        )
        self._authentication_registered = True
        self._logger.info("Authentication components registered.")
    except Exception as e:
        self._logger.error(
            f"Failed to register Authentication components: {e}",
            exc_info=True,
        )

    return self

with_authorization()

Registers Authorization connection and components

Source code in src/castlecraft_engineer/common/di.py
def with_authorization(self) -> "ContainerBuilder":
    """
    Registers Authorization connection and components
    """

    if not self._authentication_registered:
        self._logger.error(
            "Authentication components need to be registered. Skipping.",
        )
        return self

    if self._authorization_registered:
        self._logger.warning(
            "Authorization components already registered. Skipping.",
        )
        return self

    self._logger.info(
        "Setting up and registering Authorization components...",
    )

    try:
        from castlecraft_engineer.authorization.setup import setup_authorization

        setup_authorization(self._container)
        self._authorization_registered = True
        self._logger.info(
            "Authorization components set up and registered.",
        )
    except Exception as e:
        self._logger.error(
            f"Failed to set up authorization components: {e}",
            exc_info=True,
        )

    return self

with_cache(is_async=False)

Registers Cache connection and components.

Parameters:

Name Type Description Default
is_async bool

If True, registers the asynchronous Redis client. If False (default), registers the synchronous client.

False
Source code in src/castlecraft_engineer/common/di.py
def with_cache(self, is_async: bool = False) -> "ContainerBuilder":
    """
    Registers Cache connection and components.

    Args:
        is_async: If True, registers the asynchronous Redis client.
                  If False (default), registers the synchronous client.
    """
    if is_async and self._async_cache_registered:
        self._logger.warning(
            "Asynchronous cache components already registered. Skipping.",
        )
        return self
    if not is_async and self._cache_registered:
        self._logger.warning(
            "Synchronous cache components already registered. Skipping.",
        )
        return self

    if is_async:
        self._logger.info(
            "Registering asynchronous cache components...",
        )

        try:
            import redis.asyncio as aredis

            from castlecraft_engineer.caching.cache import (
                get_redis_cache_async_connection,
            )

            # Safer: Register a factory
            self._container.register(
                aredis.Redis,
                factory=lambda **_: get_redis_cache_async_connection(),
                scope=punq.Scope.singleton,
                name="cache_async",
            )
            self._async_cache_registered = True
            self._logger.info(
                "Asynchronous cache components registered (factory).",
            )
        except Exception as e:
            self._logger.error(
                f"Failed to register asynchronous cache components: {e}",
                exc_info=True,
            )
    else:
        self._logger.info("Registering synchronous cache components...")
        try:
            import redis

            from castlecraft_engineer.caching.cache import (
                get_redis_cache_connection,
            )

            sync_cache_client = get_redis_cache_connection()
            self._container.register(
                redis.Redis, instance=sync_cache_client, name="cache_sync"
            )
            self._cache_registered = True
            self._logger.info("Synchronous cache components registered.")
        except Exception as e:
            self._logger.error(
                f"Failed to register synchronous cache components: {e}",
                exc_info=True,
            )

    return self

with_command_bus()

Registers the CommandBus as a singleton.

Source code in src/castlecraft_engineer/common/di.py
def with_command_bus(self) -> "ContainerBuilder":
    """Registers the CommandBus as a singleton."""
    if self._command_bus_registered:
        self._logger.warning("CommandBus already registered. Skipping.")
        return self

    self._logger.info("Registering CommandBus...")
    try:
        from castlecraft_engineer.abstractions.command_bus import CommandBus

        self._container.register(
            CommandBus,
            factory=lambda c=self._container: CommandBus(container=c),
            scope=punq.Scope.singleton,
        )
        self._command_bus_registered = True
        self.command_bus = self._container.resolve(CommandBus)
        self._logger.info("CommandBus registered as singleton.")
    except Exception as e:
        self._logger.error(f"Failed to register CommandBus: {e}", exc_info=True)
    return self

with_database()

Registers database connection and components

Source code in src/castlecraft_engineer/common/di.py
def with_database(self) -> "ContainerBuilder":
    """
    Registers database connection and components
    """
    if self._db_registered:
        self._logger.warning(
            "Database components already registered. Skipping.",
        )
        return self

    self._logger.info("Registering synchronous database components...")

    try:
        from sqlalchemy import Engine
        from sqlalchemy.orm import Session, sessionmaker

        from castlecraft_engineer.database.connection import (
            SyncSessionFactory,
            get_engine,
        )

        sync_engine = get_engine()
        self._container.register(
            Engine,
            instance=sync_engine,
            name="db_sync_engine",
        )

        self._container.register(
            sessionmaker[Session],
            instance=SyncSessionFactory,
            name="db_sync_session_factory",
        )

        self._container.register(
            Session,
            factory=lambda: SyncSessionFactory(),
        )

        self._db_registered = True
        self._logger.info("Synchronous database components registered.")
    except Exception as e:
        self._logger.error(
            f"Failed to register synchronous database components: {e}",
            exc_info=True,
        )

    return self

with_event_bus()

Registers the EventBus as a singleton. Note: Event handlers are typically registered directly with the EventBus instance after it's resolved, not via the DI container for the handlers themselves unless the EventBus is modified to resolve handlers.

Source code in src/castlecraft_engineer/common/di.py
def with_event_bus(self) -> "ContainerBuilder":
    """
    Registers the EventBus as a singleton.
    Note: Event handlers are typically registered directly with the
    EventBus instance after it's resolved, not via the DI container
    for the handlers themselves unless the EventBus is modified to
    resolve handlers.
    """
    if self._event_bus_registered:
        self._logger.warning("EventBus already registered. Skipping.")
        return self

    self._logger.info("Registering EventBus...")
    try:
        from castlecraft_engineer.abstractions.event_bus import EventBus

        self._container.register(
            EventBus,
            factory=lambda c=self._container: EventBus(container=c),
            scope=punq.Scope.singleton,
        )
        self._event_bus_registered = True
        self.event_bus = self._container.resolve(EventBus)
        self._logger.info("EventBus registered as singleton.")
    except Exception as e:
        self._logger.error(f"Failed to register EventBus: {e}", exc_info=True)
    return self

with_query_bus()

Registers the QueryBus as a singleton.

Source code in src/castlecraft_engineer/common/di.py
def with_query_bus(self) -> "ContainerBuilder":
    """Registers the QueryBus as a singleton."""
    if self._query_bus_registered:
        self._logger.warning("QueryBus already registered. Skipping.")
        return self

    self._logger.info("Registering QueryBus...")
    try:
        from castlecraft_engineer.abstractions.query_bus import QueryBus

        self._container.register(
            QueryBus,
            factory=lambda c=self._container: QueryBus(container=c),
            scope=punq.Scope.singleton,
        )
        self._query_bus_registered = True
        self.query_bus = self._container.resolve(QueryBus)
        self._logger.info("QueryBus registered as singleton.")
    except Exception as e:
        self._logger.error(f"Failed to register QueryBus: {e}", exc_info=True)
    return self

Requests

A class that mimics core functionalities of the requests library using Python's standard http.client.

Source code in src/castlecraft_engineer/common/requests.py
class Requests:
    """
    A class that mimics core functionalities of the requests library
    using Python's standard http.client.
    """

    def _prepare_url_and_connection(
        self, url: str, verify_ssl: bool = True, timeout: float = 10.0
    ) -> Tuple[
        Union[http.client.HTTPSConnection, http.client.HTTPConnection], str, str
    ]:
        parsed_url = urllib.parse.urlparse(url)
        if not parsed_url.scheme or not parsed_url.netloc:
            raise ValueError(
                f"Invalid URL: '{url}'. Missing scheme or network location."
            )

        host = parsed_url.hostname
        if host is None:
            raise ValueError(f"Invalid URL: '{url}'. Could not determine hostname.")

        path = parsed_url.path
        if not path:
            path = "/"
        if parsed_url.query:
            path += "?" + parsed_url.query

        port: Optional[int] = parsed_url.port

        if parsed_url.scheme == "https":
            context = ssl.create_default_context()
            if not verify_ssl:
                context.check_hostname = False
                context.verify_mode = ssl.CERT_NONE
            effective_port = port if port is not None else 443
            conn: Union[http.client.HTTPConnection, http.client.HTTPSConnection] = (
                http.client.HTTPSConnection(
                    host, port=effective_port, context=context, timeout=timeout
                )
            )
        elif parsed_url.scheme == "http":
            effective_port = port if port is not None else 80
            conn = http.client.HTTPConnection(
                host, port=effective_port, timeout=timeout
            )
        else:
            raise ValueError(f"Unsupported URL scheme: {parsed_url.scheme}")

        return conn, host, path

    def _extract_common_kwargs(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
        common_args: Dict[str, Any] = {}
        common_args["headers"] = kwargs.pop("headers", None)
        common_args["auth"] = kwargs.pop("auth", None)

        verify = kwargs.pop("verify", True)
        if isinstance(verify, str):
            common_args["verify_ssl"] = True
        else:
            common_args["verify_ssl"] = bool(verify)

        common_args["timeout"] = float(kwargs.pop("timeout", 10.0))
        return common_args

    def _request(
        self,
        method: str,
        url: str,
        params: Optional[Dict[str, Any]] = None,
        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
        json_data: Optional[Any] = None,
        **kwargs: Any,
    ) -> Response:
        common_args = self._extract_common_kwargs(kwargs)
        headers: Optional[Dict[str, str]] = common_args["headers"]
        auth: Optional[Tuple[str, str]] = common_args["auth"]
        verify_ssl: bool = common_args["verify_ssl"]
        timeout: float = common_args["timeout"]

        actual_url = url
        if params:
            query_string = urllib.parse.urlencode(params, doseq=True)
            if "?" in actual_url:
                actual_url += "&" + query_string
            else:
                actual_url += "?" + query_string

        conn, host_header_val, path = self._prepare_url_and_connection(
            actual_url, verify_ssl, timeout
        )

        request_headers: Dict[str, str] = {"Host": host_header_val}
        if headers:
            # Ensure header keys and values are strings
            header_items = headers.items()  # Make items() call explicit
            for key, value in header_items:
                str_key = str(key)
                str_value = str(value)
                request_headers[str_key] = str_value

        body_bytes: Optional[bytes] = None
        if json_data is not None and data is not None:
            raise ValueError(
                "Cannot provide both 'data' and 'json' (json_data internal)."
            )

        if json_data is not None:
            body_bytes = json.dumps(json_data).encode("utf-8")
            if "content-type" not in {k.lower() for k in request_headers.keys()}:
                request_headers["Content-Type"] = "application/json"
        elif data is not None:
            if isinstance(data, dict):
                body_bytes = urllib.parse.urlencode(data, doseq=True).encode("utf-8")
                if "content-type" not in {k.lower() for k in request_headers.keys()}:
                    request_headers["Content-Type"] = (
                        "application/x-www-form-urlencoded"
                    )
            elif isinstance(data, str):
                body_bytes = data.encode("utf-8")
            elif isinstance(data, bytes):
                body_bytes = data
            else:
                raise TypeError("Data must be a dict, str, or bytes.")

        if auth and isinstance(auth, tuple) and len(auth) == 2:
            user, passwd = auth
            auth_header_val = b64encode(f"{user}:{passwd}".encode()).decode("ascii")
            request_headers["Authorization"] = f"Basic {auth_header_val}"

        try:
            conn.request(method.upper(), path, body=body_bytes, headers=request_headers)
            http_response = conn.getresponse()
            response_content = http_response.read()

            # For Response object context
            final_request_headers = request_headers.copy()

            return Response(
                status_code=http_response.status,
                headers=http_response.headers,
                content=response_content,
                url=actual_url,
                request_method=method.upper(),
                request_body=body_bytes,
                request_headers=final_request_headers,
            )
        except (
            http.client.HTTPException,
            OSError,
            ssl.SSLError,
            ConnectionRefusedError,
            TimeoutError,
        ) as e:
            # TimeoutError for Python 3.3+ for socket timeouts
            raise HTTPError(f"Request failed for {method} {url}: {e}") from e
        finally:
            conn.close()

    def get(
        self, url: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
    ) -> Response:
        return self._request("GET", url, params=params, **kwargs)

    def post(
        self,
        url: str,
        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
        json: Optional[Any] = None,
        **kwargs: Any,
    ) -> Response:
        return self._request("POST", url, data=data, json_data=json, **kwargs)

    def put(
        self,
        url: str,
        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
        json: Optional[Any] = None,
        **kwargs: Any,
    ) -> Response:
        return self._request("PUT", url, data=data, json_data=json, **kwargs)

    def delete(self, url: str, **kwargs: Any) -> Response:
        return self._request("DELETE", url, **kwargs)

    def head(self, url: str, **kwargs: Any) -> Response:
        # HEAD requests should not have a body in the response,
        # but http.client handles this.
        # Our Response object will have empty content.
        return self._request("HEAD", url, **kwargs)

    def options(self, url: str, **kwargs: Any) -> Response:
        return self._request("OPTIONS", url, **kwargs)

    def patch(
        self,
        url: str,
        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
        json: Optional[Any] = None,
        **kwargs: Any,
    ) -> Response:
        return self._request("PATCH", url, data=data, json_data=json, **kwargs)

Response

A simple class for representing an HTTP Response object.

Source code in src/castlecraft_engineer/common/requests.py
class Response:
    """
    A simple class for representing an HTTP Response object.
    """

    def __init__(
        self,
        status_code: int,
        headers: http.client.HTTPMessage,
        content: bytes,
        url: str,
        request_method: str,
        request_body: Optional[bytes] = None,
        request_headers: Optional[Dict[str, str]] = None,
    ):
        self.status_code = status_code
        self._raw_headers = headers
        self.content = content
        self.url = url
        self.request_method = request_method
        self.request_body = request_body
        self.request_headers = request_headers if request_headers else {}
        self._cached_json: Optional[Any] = None
        self._text_content: Optional[str] = None

    @property
    def headers(self) -> Dict[str, str]:
        """Returns headers as a case-insensitive dictionary."""
        # http.client.HTTPMessage is already somewhat case-insensitive for get()
        # but this provides a consistent dict view with lowercased keys.
        return {k.lower(): v for k, v in self._raw_headers.items()}

    @property
    def text(self) -> str:
        """Returns the content of the response, in unicode."""
        if self._text_content is None:
            content_type = self.headers.get("content-type", "")
            charset = "utf-8"  # Default
            if "charset=" in content_type:
                parts = content_type.split("charset=")
                if len(parts) > 1:
                    charset = parts[-1].split(";")[0].strip()
            try:
                self._text_content = self.content.decode(charset)
            except (UnicodeDecodeError, LookupError):
                self._text_content = self.content.decode("utf-8", errors="replace")
        return self._text_content

    def json(self, **kwargs) -> Any:
        """Returns the json-encoded content of a response, if any."""
        if self._cached_json is None:
            if not self.content:
                raise json.JSONDecodeError("No content to decode as JSON", "", 0)
            try:
                self._cached_json = json.loads(self.text, **kwargs)
            except json.JSONDecodeError as e:
                # Provide more context if possible
                raise ValueError(
                    f"Failed to decode JSON from URL {self.url}. Error: {e}. Content: '{self.text[:100]}...'"
                ) from e
        return self._cached_json

    def raise_for_status(self):
        """
        Raises an HTTPError if the HTTP request
        returned an unsuccessful status code.
        """
        if 400 <= self.status_code < 500:
            raise HTTPError(
                f"{self.status_code} Client Error for url: {self.url}",
                response=self,
            )
        elif 500 <= self.status_code < 600:
            raise HTTPError(
                f"{self.status_code} Server Error for url: {self.url}",
                response=self,
            )

    def __repr__(self):
        return f"<Response [{self.status_code}]>"

headers property

Returns headers as a case-insensitive dictionary.

text property

Returns the content of the response, in unicode.

json(**kwargs)

Returns the json-encoded content of a response, if any.

Source code in src/castlecraft_engineer/common/requests.py
def json(self, **kwargs) -> Any:
    """Returns the json-encoded content of a response, if any."""
    if self._cached_json is None:
        if not self.content:
            raise json.JSONDecodeError("No content to decode as JSON", "", 0)
        try:
            self._cached_json = json.loads(self.text, **kwargs)
        except json.JSONDecodeError as e:
            # Provide more context if possible
            raise ValueError(
                f"Failed to decode JSON from URL {self.url}. Error: {e}. Content: '{self.text[:100]}...'"
            ) from e
    return self._cached_json

raise_for_status()

Raises an HTTPError if the HTTP request returned an unsuccessful status code.

Source code in src/castlecraft_engineer/common/requests.py
def raise_for_status(self):
    """
    Raises an HTTPError if the HTTP request
    returned an unsuccessful status code.
    """
    if 400 <= self.status_code < 500:
        raise HTTPError(
            f"{self.status_code} Client Error for url: {self.url}",
            response=self,
        )
    elif 500 <= self.status_code < 600:
        raise HTTPError(
            f"{self.status_code} Server Error for url: {self.url}",
            response=self,
        )

decrypt_data(encrypted_data, key)

Decrypts data encrypted with AES-GCM.

Parameters:

Name Type Description Default
encrypted_data str

The base64 encoded string containing the nonce and ciphertext.

required
key bytes

The AES decryption key (must be 16, 24, or 32 bytes).

required

Returns:

Type Description
str

The decrypted string data.

Source code in src/castlecraft_engineer/common/crypto.py
def decrypt_data(encrypted_data: str, key: bytes) -> str:
    """
    Decrypts data encrypted with AES-GCM.

    Args:
        encrypted_data: The base64 encoded string containing the nonce and ciphertext.
        key: The AES decryption key (must be 16, 24, or 32 bytes).

    Returns:
        The decrypted string data.
    """
    if len(key) not in [16, 24, 32]:
        raise InvalidEncryptionKey("AES key must be 16, 24, or 32 bytes long")

    try:
        encrypted_payload_bytes = b64decode(
            encrypted_data.encode("utf-8"), validate=True
        )
    except Exception as e:  # Catches binascii.Error from b64decode
        raise InvalidEncryptionFormat(f"Invalid base64 encoding: {e}")

    nonce_size = 12  # Must match the nonce size used during encryption
    if len(encrypted_payload_bytes) < nonce_size:
        raise InvalidEncryptionFormat("Encrypted data is too short to contain a nonce.")

    nonce = encrypted_payload_bytes[:nonce_size]
    ciphertext_and_tag = encrypted_payload_bytes[nonce_size:]

    aesgcm = AESGCM(key)
    try:
        plaintext_bytes = aesgcm.decrypt(
            nonce, ciphertext_and_tag, None
        )  # No associated data
        return plaintext_bytes.decode("utf-8")
    except InvalidTag:
        raise InvalidEncryptionFormat(
            "Decryption failed: authentication tag mismatch or data corrupted."
        )
    except Exception as e:  # Catch any other potential errors during decryption
        raise InvalidEncryptionFormat(
            f"Decryption failed due to an unexpected error: {e}"
        )

encrypt_data(data, key)

Encrypts data using AES-GCM.

Parameters:

Name Type Description Default
data str

The string data to encrypt.

required
key bytes

The AES encryption key (must be 16, 24, or 32 bytes).

required

Returns:

Type Description
str

A base64 encoded string of the nonce prepended to the ciphertext and tag.

Source code in src/castlecraft_engineer/common/crypto.py
def encrypt_data(data: str, key: bytes) -> str:
    """
    Encrypts data using AES-GCM.

    Args:
        data: The string data to encrypt.
        key: The AES encryption key (must be 16, 24, or 32 bytes).

    Returns:
        A base64 encoded string of the nonce prepended to the ciphertext and tag.
    """
    if len(key) not in [16, 24, 32]:
        raise InvalidEncryptionKey("AES key must be 16, 24, or 32 bytes long")

    aesgcm = AESGCM(key)
    nonce = os.urandom(12)  # AES-GCM standard nonce size is 12 bytes (96 bits)
    data_bytes = data.encode("utf-8")

    # encrypt() returns ciphertext which includes the authentication tag
    ciphertext_and_tag = aesgcm.encrypt(nonce, data_bytes, None)  # No associated data

    # Prepend nonce to ciphertext_and_tag, then base64 encode
    encrypted_payload = nonce + ciphertext_and_tag
    return b64encode(encrypted_payload).decode("utf-8")

get_async_redis_connection(redis_uri=None, is_sentinel_enabled=False, sentinels_uri=None, sentinel_username=None, sentinel_password=None, sentinel_master_username=None, sentinel_master_password=None, sentinel_master_service=None) async

Gets an asynchronous Redis connection.

Source code in src/castlecraft_engineer/common/redis.py
async def get_async_redis_connection(
    redis_uri: Optional[str] = None,
    is_sentinel_enabled: bool = False,
    sentinels_uri: Optional[str] = None,
    sentinel_username: Optional[str] = None,
    sentinel_password: Optional[str] = None,
    sentinel_master_username: Optional[str] = None,
    sentinel_master_password: Optional[str] = None,
    sentinel_master_service: Optional[str] = None,
) -> aredis.Redis:
    """Gets an asynchronous Redis connection."""
    if is_sentinel_enabled:
        if not sentinels_uri:
            raise ValueError(
                "sentinels_uri must be provided when is_sentinel_enabled is True."
            )
        if not sentinel_master_service:
            raise ValueError(
                "sentinel_master_service must be provided when is_sentinel_enabled is True."
            )

        uris = split_string(",", sentinels_uri)
        sentinels = []
        for sen in uris:
            if ":" in sen:
                hostname, port_str = sen.split(":", 1)
                try:
                    sentinels.append((hostname.strip(), int(port_str.strip())))
                except ValueError:
                    pass

        sentinel_kwargs = {}
        if sentinel_username:
            sentinel_kwargs["username"] = sentinel_username
        if sentinel_password:
            sentinel_kwargs["password"] = sentinel_password

        sentinel = aredis.Sentinel(
            sentinels=sentinels,
            sentinel_kwargs=sentinel_kwargs,
            username=sentinel_master_username,
            password=sentinel_master_password,
        )

        connection = sentinel.master_for(sentinel_master_service)
        await connection.ping()

        return connection

    # Note: decode_responses=True can be added here if needed globally
    connection = aredis.from_url(redis_uri)
    # Use await for async ping
    await connection.ping()

    return connection