Skip to content

Registry class

edgy.Registry

Registry(database, *, with_content_type=False, schema=None, extra=None, **kwargs)

The command center for the models of Edgy.

PARAMETER DESCRIPTION
database

TYPE: Union[Database, str, DatabaseURL]

with_content_type

TYPE: Union[bool, type[BaseModelType]] DEFAULT: False

schema

TYPE: Union[str, None] DEFAULT: None

extra

TYPE: Optional[dict[str, Database]] DEFAULT: None

**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/connection/registry.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def __init__(
    self,
    database: Union[Database, str, DatabaseURL],
    *,
    with_content_type: Union[bool, type["BaseModelType"]] = False,
    schema: Union[str, None] = None,
    extra: Optional[dict[str, Database]] = None,
    **kwargs: Any,
) -> None:
    self.db_schema = schema
    extra = extra or {}
    self.database: Database = (
        database if isinstance(database, Database) else Database(database, **kwargs)
    )
    self.models: dict[str, type[BaseModelType]] = {}
    self.reflected: dict[str, type[BaseModelType]] = {}
    self.tenant_models: dict[str, type[BaseModelType]] = {}
    self.pattern_models: dict[str, type[AutoReflectionModel]] = {}
    self.dbs_reflected = set()

    self.schema = Schema(registry=self)
    # when setting a Model or Reflected Model execute the callbacks
    # Note: they are only executed if the Model is not in Registry yet
    self._onetime_callbacks: dict[
        Union[str, None], list[Callable[[type[BaseModelType]], None]]
    ] = defaultdict(list)
    self._callbacks: dict[Union[str, None], list[Callable[[type[BaseModelType]], None]]] = (
        defaultdict(list)
    )

    self.extra: dict[str, Database] = {
        k: v if isinstance(v, Database) else Database(v) for k, v in extra.items()
    }
    # we want to get all problems before failing
    assert all(
        [self.extra_name_check(x) for x in self.extra]  # noqa: C419
    ), "Invalid name in extra detected. See logs for details."
    self.metadata_by_url = MetaDataByUrlDict(registry=self)

    if with_content_type is not False:
        self._set_content_type(with_content_type)

db_schema class-attribute instance-attribute

db_schema = schema

content_type class-attribute instance-attribute

content_type = None

dbs_reflected instance-attribute

dbs_reflected = set()

database instance-attribute

database = database if isinstance(database, Database) else Database(database, **kwargs)

models instance-attribute

models = {}

reflected instance-attribute

reflected = {}

tenant_models instance-attribute

tenant_models = {}

pattern_models instance-attribute

pattern_models = {}

schema instance-attribute

schema = Schema(registry=self)

_onetime_callbacks instance-attribute

_onetime_callbacks = defaultdict(list)

_callbacks instance-attribute

_callbacks = defaultdict(list)

extra instance-attribute

extra = {k: v if isinstance(v, Database) else Database(v)for (k, v) in items()}

metadata_by_url instance-attribute

metadata_by_url = MetaDataByUrlDict(registry=self)

metadata_by_name property writable

metadata_by_name

metadata property

metadata

declarative_base cached property

declarative_base

engine property

engine

sync_engine property

sync_engine

extra_name_check

extra_name_check(name)
PARAMETER DESCRIPTION
name

TYPE: Any

Source code in edgy/core/connection/registry.py
151
152
153
154
155
156
157
158
159
160
161
162
163
def extra_name_check(self, name: Any) -> bool:
    if not isinstance(name, str):
        logger.error(f"Extra database name: {name!r} is not a string.")
        return False
    elif not name.strip():
        logger.error(f'Extra database name: "{name}" is empty.')
        return False

    if name.strip() != name:
        logger.warning(
            f'Extra database name: "{name}" starts or ends with whitespace characters.'
        )
    return True

__copy__

__copy__()
Source code in edgy/core/connection/registry.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def __copy__(self) -> "Registry":
    content_type: Union[bool, type[BaseModelType]] = False
    if self.content_type is not None:
        try:
            content_type2 = content_type = self.get_model(
                "ContentType", include_content_type_attr=False
            ).copy_edgy_model()
            # cleanup content_type copy
            for field_name in list(content_type2.meta.fields.keys()):
                if field_name.startswith("reverse_"):
                    del content_type2.meta.fields[field_name]
        except LookupError:
            content_type = self.content_type
    _copy = Registry(
        self.database, with_content_type=content_type, schema=self.db_schema, extra=self.extra
    )
    for i in ["models", "reflected", "tenant_models", "pattern_models"]:
        dict_models = getattr(_copy, i)
        dict_models.update(
            (
                (key, val.copy_edgy_model(_copy))
                for key, val in getattr(self, i).items()
                if key not in dict_models
            )
        )
    _copy.dbs_reflected = set(self.dbs_reflected)
    return _copy

_set_content_type

_set_content_type(with_content_type, old_content_type_to_replace=None)
PARAMETER DESCRIPTION
with_content_type

TYPE: Union[Literal[True], type[BaseModelType]]

old_content_type_to_replace

TYPE: Optional[type[BaseModelType]] DEFAULT: None

Source code in edgy/core/connection/registry.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def _set_content_type(
    self,
    with_content_type: Union[Literal[True], type["BaseModelType"]],
    old_content_type_to_replace: Optional[type["BaseModelType"]] = None,
) -> None:
    from edgy.contrib.contenttypes.fields import BaseContentTypeFieldField, ContentTypeField
    from edgy.contrib.contenttypes.models import ContentType
    from edgy.core.db.models.metaclasses import MetaInfo
    from edgy.core.db.relationships.related_field import RelatedField
    from edgy.core.utils.models import create_edgy_model

    if with_content_type is True:
        with_content_type = ContentType

    real_content_type: type[BaseModelType] = with_content_type

    if real_content_type.meta.abstract:
        meta_args = {
            "tablename": "contenttypes",
            "registry": self,
        }

        new_meta: MetaInfo = MetaInfo(None, **meta_args)
        # model adds itself to registry and executes callbacks
        real_content_type = create_edgy_model(
            "ContentType",
            with_content_type.__module__,
            __metadata__=new_meta,
            __bases__=(with_content_type,),
        )
    elif real_content_type.meta.registry is None:
        real_content_type.add_to_registry(self, "ContentType")
    self.content_type = real_content_type

    def callback(model_class: type["BaseModelType"]) -> None:
        # they are not updated, despite this shouldn't happen anyway
        if issubclass(model_class, ContentType):
            return
        # skip if is explicit set or remove when copying
        for field in model_class.meta.fields.values():
            if isinstance(field, BaseContentTypeFieldField):
                if (
                    old_content_type_to_replace is not None
                    and field.target is old_content_type_to_replace
                ):
                    field.target_registry = self
                    field.target = real_content_type
                    # simply overwrite
                    real_content_type.meta.fields[field.related_name] = RelatedField(
                        name=field.related_name,
                        foreign_key_name=field.name,
                        related_from=model_class,
                        owner=real_content_type,
                    )
                return

        # e.g. exclude field
        if "content_type" in model_class.meta.fields:
            return
        related_name = f"reverse_{model_class.__name__.lower()}"
        assert (
            related_name not in real_content_type.meta.fields
        ), f"duplicate model name: {model_class.__name__}"

        field_args: dict[str, Any] = {
            "name": "content_type",
            "owner": model_class,
            "to": real_content_type,
            "no_constraint": real_content_type.no_constraint,
            "no_copy": True,
        }
        if model_class.meta.registry is not real_content_type.meta.registry:
            field_args["relation_has_post_delete_callback"] = True
            field_args["force_cascade_deletion_relation"] = True
        model_class.meta.fields["content_type"] = cast(
            "BaseFieldType",
            ContentTypeField(**field_args),
        )
        real_content_type.meta.fields[related_name] = RelatedField(
            name=related_name,
            foreign_key_name="content_type",
            related_from=model_class,
            owner=real_content_type,
        )

    self.register_callback(None, callback, one_time=False)

get_model

get_model(model_name, *, include_content_type_attr=True)
PARAMETER DESCRIPTION
model_name

TYPE: str

include_content_type_attr

TYPE: bool DEFAULT: True

Source code in edgy/core/connection/registry.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def get_model(
    self, model_name: str, *, include_content_type_attr: bool = True
) -> type["BaseModelType"]:
    if (
        include_content_type_attr
        and model_name == "ContentType"
        and self.content_type is not None
    ):
        return self.content_type
    if model_name in self.models:
        return self.models[model_name]
    elif model_name in self.reflected:
        return self.reflected[model_name]
    elif model_name in self.tenant_models:
        return self.tenant_models[model_name]
    else:
        raise LookupError(f"Registry doesn't have a {model_name} model.") from None

refresh_metadata

refresh_metadata(*, update_only=False, multi_schema=False, ignore_schema_pattern='information_schema')
PARAMETER DESCRIPTION
update_only

TYPE: bool DEFAULT: False

multi_schema

TYPE: Union[bool, Pattern, str] DEFAULT: False

ignore_schema_pattern

TYPE: Union[None, Pattern, str] DEFAULT: 'information_schema'

Source code in edgy/core/connection/registry.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def refresh_metadata(
    self,
    *,
    update_only: bool = False,
    multi_schema: Union[bool, re.Pattern, str] = False,
    ignore_schema_pattern: Union[None, "re.Pattern", str] = "information_schema",
) -> None:
    if not update_only:
        for val in self.metadata_by_name.values():
            val.clear()
    maindatabase_url = str(self.database.url)
    if multi_schema is not False:
        schemes_tree: dict[str, tuple[Optional[str], list[str]]] = {
            v[0]: (key, v[2])
            for key, v in run_sync(self.schema.get_schemes_tree(no_reflect=True)).items()
        }
    else:
        schemes_tree = {
            maindatabase_url: (None, [self.db_schema]),
            **{str(v.url): (k, [None]) for k, v in self.extra.items()},
        }

    if isinstance(multi_schema, str):
        multi_schema = re.compile(multi_schema)
    if isinstance(ignore_schema_pattern, str):
        ignore_schema_pattern = re.compile(ignore_schema_pattern)
    for model_class in self.models.values():
        if not update_only:
            model_class._table = None
            model_class._db_schemas = {}
        url = str(model_class.database.url)
        if url in schemes_tree:
            extra_key, schemes = schemes_tree[url]
            for schema in schemes:
                if multi_schema is not False:
                    if multi_schema is not True and multi_schema.match(schema) is None:
                        continue
                    if (
                        ignore_schema_pattern is not None
                        and ignore_schema_pattern.match(schema) is not None
                    ):
                        continue
                    if not getattr(model_class.meta, "is_tenant", False):
                        if (
                            model_class.__using_schema__ is Undefined
                            or model_class.__using_schema__ is None
                        ):
                            if schema != "":
                                continue
                        elif model_class.__using_schema__ != schema:
                            continue
                model_class.table_schema(schema=schema, metadata=self.metadata_by_url[url])

    # don't initialize to keep the metadata clean
    if not update_only:
        for model_class in self.reflected.values():
            model_class._table = None
            model_class._db_schemas = {}

register_callback

register_callback(name_or_class, callback, one_time=None)
PARAMETER DESCRIPTION
name_or_class

TYPE: Union[type[BaseModelType], str, None]

callback

TYPE: Callable[[type[BaseModelType]], None]

one_time

TYPE: Optional[bool] DEFAULT: None

Source code in edgy/core/connection/registry.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def register_callback(
    self,
    name_or_class: Union[type["BaseModelType"], str, None],
    callback: Callable[[type["BaseModelType"]], None],
    one_time: Optional[bool] = None,
) -> None:
    if one_time is None:
        # True for model specific callbacks, False for general callbacks
        one_time = name_or_class is not None
    called: bool = False
    if name_or_class is None:
        for model in self.models.values():
            callback(model)
            called = True
        for model in self.reflected.values():
            callback(model)
            called = True
        for name, model in self.tenant_models.items():
            # for tenant only models
            if name not in self.models:
                callback(model)
                called = True
    elif not isinstance(name_or_class, str):
        callback(name_or_class)
        called = True
    else:
        model_class = None
        with contextlib.suppress(LookupError):
            model_class = self.get_model(name_or_class)
        if model_class is not None:
            callback(model_class)
            called = True
    if name_or_class is not None and not isinstance(name_or_class, str):
        name_or_class = name_or_class.__name__
    if called and one_time:
        return
    if one_time:
        self._onetime_callbacks[name_or_class].append(callback)
    else:
        self._callbacks[name_or_class].append(callback)

execute_model_callbacks

execute_model_callbacks(model_class)
PARAMETER DESCRIPTION
model_class

TYPE: type[BaseModelType]

Source code in edgy/core/connection/registry.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def execute_model_callbacks(self, model_class: type["BaseModelType"]) -> None:
    name = model_class.__name__
    callbacks = self._onetime_callbacks.get(name)
    while callbacks:
        callbacks.pop()(model_class)

    callbacks = self._onetime_callbacks.get(None)
    while callbacks:
        callbacks.pop()(model_class)

    callbacks = self._callbacks.get(name)
    if callbacks:
        for callback in callbacks:
            callback(model_class)

    callbacks = self._callbacks.get(None)
    if callbacks:
        for callback in callbacks:
            callback(model_class)

init_models

init_models(*, init_column_mappers=True, init_class_attrs=True)

Initializes lazy parts of models meta. Normally not needed to call.

PARAMETER DESCRIPTION
init_column_mappers

TYPE: bool DEFAULT: True

init_class_attrs

TYPE: bool DEFAULT: True

Source code in edgy/core/connection/registry.py
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
def init_models(
    self, *, init_column_mappers: bool = True, init_class_attrs: bool = True
) -> None:
    """
    Initializes lazy parts of models meta. Normally not needed to call.
    """
    for model_class in self.models.values():
        model_class.meta.full_init(
            init_column_mappers=init_column_mappers, init_class_attrs=init_class_attrs
        )

    for model_class in self.reflected.values():
        model_class.meta.full_init(
            init_column_mappers=init_column_mappers, init_class_attrs=init_class_attrs
        )

invalidate_models

invalidate_models(*, clear_class_attrs=True)

Invalidate all lazy parts of meta. They will automatically re-initialized on access.

PARAMETER DESCRIPTION
clear_class_attrs

TYPE: bool DEFAULT: True

Source code in edgy/core/connection/registry.py
474
475
476
477
478
479
480
481
def invalidate_models(self, *, clear_class_attrs: bool = True) -> None:
    """
    Invalidate all lazy parts of meta. They will automatically re-initialized on access.
    """
    for model_class in self.models.values():
        model_class.meta.invalidate(clear_class_attrs=clear_class_attrs)
    for model_class in self.reflected.values():
        model_class.meta.invalidate(clear_class_attrs=clear_class_attrs)

get_tablenames

get_tablenames()
Source code in edgy/core/connection/registry.py
483
484
485
486
487
488
489
def get_tablenames(self) -> set[str]:
    return_set = set()
    for model_class in self.models.values():
        return_set.add(model_class.meta.tablename)
    for model_class in self.reflected.values():
        return_set.add(model_class.meta.tablename)
    return return_set

_connect_and_init async

_connect_and_init(name, database)
PARAMETER DESCRIPTION
name

TYPE: Union[str, None]

database

TYPE: Database

Source code in edgy/core/connection/registry.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
async def _connect_and_init(self, name: Union[str, None], database: "Database") -> None:
    from edgy.core.db.models.metaclasses import MetaInfo

    await database.connect()
    if not self.pattern_models or name in self.dbs_reflected:
        return
    schemes = set()
    for pattern_model in self.pattern_models.values():
        if name not in pattern_model.meta.databases:
            continue
        schemes.update(pattern_model.meta.schemes)
    tmp_metadata = sqlalchemy.MetaData()
    for schema in schemes:
        await database.run_sync(tmp_metadata.reflect, schema=schema)
    try:
        for table in tmp_metadata.tables.values():
            for pattern_model in self.pattern_models.values():
                if name not in pattern_model.meta.databases or table.schema not in schemes:
                    continue
                assert pattern_model.meta.model is pattern_model
                # table.key would contain the schema name
                if not pattern_model.meta.include_pattern.match(table.name) or (
                    pattern_model.meta.exclude_pattern
                    and pattern_model.meta.exclude_pattern.match(table.name)
                ):
                    continue
                if pattern_model.fields_not_supported_by_table(table):
                    continue
                new_name = pattern_model.meta.template(table)
                old_model: Optional[type[BaseModelType]] = None
                with contextlib.suppress(LookupError):
                    old_model = self.get_model(new_name)
                if old_model is not None:
                    raise Exception(
                        f"Conflicting model: {old_model.__name__} with pattern model: {pattern_model.__name__}"
                    )
                concrete_reflect_model = pattern_model.copy_edgy_model(
                    name=new_name, meta_info_class=MetaInfo
                )
                concrete_reflect_model.meta.tablename = table.name
                concrete_reflect_model.__using_schema__ = table.schema
                concrete_reflect_model.add_to_registry(self, database=database)

        self.dbs_reflected.add(name)
    except BaseException as exc:
        await database.disconnect()
        raise exc

__aenter__ async

__aenter__()
Source code in edgy/core/connection/registry.py
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
async def __aenter__(self) -> "Registry":
    dbs: list[tuple[Union[str, None], Database]] = [(None, self.database)]
    for name, db in self.extra.items():
        dbs.append((name, db))
    ops = [self._connect_and_init(name, db) for name, db in dbs]
    results: list[Union[BaseException, bool]] = await asyncio.gather(
        *ops, return_exceptions=True
    )
    if any(isinstance(x, BaseException) for x in results):
        ops2 = []
        for num, value in enumerate(results):
            if not isinstance(value, BaseException):
                ops2.append(dbs[num][1].disconnect())
            else:
                logger.opt(exception=value).error("Failed to connect database.")
        await asyncio.gather(*ops2)
    return self

__aexit__ async

__aexit__(exc_type=None, exc_value=None, traceback=None)
PARAMETER DESCRIPTION
exc_type

TYPE: Optional[type[BaseException]] DEFAULT: None

exc_value

TYPE: Optional[BaseException] DEFAULT: None

traceback

TYPE: Optional[TracebackType] DEFAULT: None

Source code in edgy/core/connection/registry.py
557
558
559
560
561
562
563
564
565
566
async def __aexit__(
    self,
    exc_type: Optional[type[BaseException]] = None,
    exc_value: Optional[BaseException] = None,
    traceback: Optional[TracebackType] = None,
) -> None:
    ops = [self.database.disconnect()]
    for value in self.extra.values():
        ops.append(value.disconnect())
    await asyncio.gather(*ops)

asgi

asgi(app: None, handle_lifespan: bool = False) -> Callable[[ASGIApp], ASGIHelper]
asgi(app: ASGIApp, handle_lifespan: bool = False) -> ASGIHelper
asgi(app=None, handle_lifespan=False)

Return wrapper for asgi integration.

PARAMETER DESCRIPTION
app

TYPE: Optional[ASGIApp] DEFAULT: None

handle_lifespan

TYPE: bool DEFAULT: False

Source code in edgy/core/connection/registry.py
582
583
584
585
586
587
588
589
590
def asgi(
    self,
    app: Optional[ASGIApp] = None,
    handle_lifespan: bool = False,
) -> Union[ASGIHelper, Callable[[ASGIApp], ASGIHelper]]:
    """Return wrapper for asgi integration."""
    if app is not None:
        return ASGIHelper(app=app, registry=self, handle_lifespan=handle_lifespan)
    return partial(ASGIHelper, registry=self, handle_lifespan=handle_lifespan)

create_all async

create_all(refresh_metadata=True, databases=(None))
PARAMETER DESCRIPTION
refresh_metadata

TYPE: bool DEFAULT: True

databases

TYPE: Sequence[Union[str, None]] DEFAULT: (None)

Source code in edgy/core/connection/registry.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
async def create_all(
    self, refresh_metadata: bool = True, databases: Sequence[Union[str, None]] = (None,)
) -> None:
    # otherwise old references to non-existing tables, fks can lurk around
    if refresh_metadata:
        self.refresh_metadata(multi_schema=True)
    if self.db_schema:
        await self.schema.create_schema(
            self.db_schema, True, True, update_cache=True, databases=databases
        )
    else:
        # fallback when no schemes are in use. Because not all dbs support schemes
        # we cannot just use a scheme = ""
        for database in databases:
            db = self.database if database is None else self.extra[database]
            # don't warn here about inperformance
            async with db as db:
                with db.force_rollback(False):
                    await db.create_all(self.metadata_by_name[database])

drop_all async

drop_all(databases=(None))
PARAMETER DESCRIPTION
databases

TYPE: Sequence[Union[str, None]] DEFAULT: (None)

Source code in edgy/core/connection/registry.py
612
613
614
615
616
617
618
619
620
621
622
623
624
625
async def drop_all(self, databases: Sequence[Union[str, None]] = (None,)) -> None:
    if self.db_schema:
        await self.schema.drop_schema(
            self.db_schema, cascade=True, if_exists=True, databases=databases
        )
    else:
        # fallback when no schemes are in use. Because not all dbs support schemes
        # we cannot just use a scheme = ""
        for database_name in databases:
            db = self.database if database_name is None else self.extra[database_name]
            # don't warn here about inperformance
            async with db as db:
                with db.force_rollback(False):
                    await db.drop_all(self.metadata_by_name[database_name])