Skip to content

Model class

edgy.Model

Model(*args, **kwargs)

Bases: ModelRowMixin, DeclarativeMixin, EdgyBaseModel

Representation of an Edgy Model.

This also means it can generate declarative SQLAlchemy models from anywhere by calling the Model.declarative() function.

Example

import edgy
from edgy import Database, Registry

database = Database("sqlite:///db.sqlite")
models = Registry(database=database)


class User(edgy.Model):
    '''
    The User model to be created in the database as a table
    If no name is provided the in Meta class, it will generate
    a "users" table for you.
    '''

    id: int = edgy.IntegerField(primary_key=True)
    is_active: bool = edgy.BooleanField(default=False)

    class Meta:
        registry = models
PARAMETER DESCRIPTION
*args

TYPE: Any DEFAULT: ()

**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/base.py
64
65
66
67
68
69
70
71
72
def __init__(self, *args: Any, **kwargs: Any) -> None:
    __show_pk__ = kwargs.pop("__show_pk__", False)
    kwargs = self.transform_input(kwargs, phase="creation")
    super().__init__(**kwargs)
    self.__dict__ = self.setup_model_from_kwargs(kwargs)
    self.__show_pk__ = __show_pk__
    # always set them in __dict__ to prevent __getattr__ loop
    self._loaded_or_deleted = False
    self._return_load_coro_on_attr_access: bool = False

columns class-attribute

columns

query class-attribute

query = Manager()
query_related = RedirectManager(redirect_name='query')

meta class-attribute

meta = MetaInfo(None, abstract=True)

__parent__ class-attribute

__parent__ = None

__is_proxy_model__ class-attribute

__is_proxy_model__ = False

__reflected__ class-attribute

__reflected__ = False

proxy_model cached property

proxy_model

identifying_db_fields cached property

identifying_db_fields

The columns used for loading, can be set per instance defaults to pknames

can_load property

can_load

table property writable

table

pkcolumns property

pkcolumns

pknames property

pknames

__proxy_model__ class-attribute

__proxy_model__ = None

__db_model__ class-attribute

__db_model__ = False

__using_schema__ class-attribute

__using_schema__ = None

__show_pk__ class-attribute instance-attribute

__show_pk__ = __show_pk__

_loaded_or_deleted class-attribute instance-attribute

_loaded_or_deleted = False

_return_load_coro_on_attr_access class-attribute instance-attribute

_return_load_coro_on_attr_access = False

signals property

signals

fields property

fields

Meta

abstract class-attribute instance-attribute

abstract = True

get_columns_for_name

get_columns_for_name(name)
PARAMETER DESCRIPTION
name

TYPE: str

Source code in edgy/core/db/models/base.py
209
210
211
212
213
214
215
216
217
def get_columns_for_name(self, name: str) -> Sequence["sqlalchemy.Column"]:
    table = self.table
    meta = self.meta
    if name in meta.field_to_columns:
        return meta.field_to_columns[name]
    elif name in table.columns:
        return (table.columns[name],)
    else:
        return cast(Sequence["sqlalchemy.Column"], _empty)

identifying_clauses

identifying_clauses()
Source code in edgy/core/db/models/base.py
219
220
221
222
223
224
225
226
def identifying_clauses(self) -> Iterable[Any]:
    for field_name in self.identifying_db_fields:
        field = self.meta.fields.get(field_name)
        if field is not None:
            for column, value in field.clean(field_name, self.__dict__[field_name]).items():
                yield getattr(self.table.columns, column) == value
        else:
            yield getattr(self.table.columns, field_name) == self.__dict__[field_name]

generate_proxy_model classmethod

generate_proxy_model()

Generates a proxy model for each model. This proxy model is a simple shallow copy of the original model being generated.

Source code in edgy/core/db/models/base.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
@classmethod
def generate_proxy_model(cls) -> Type["Model"]:
    """
    Generates a proxy model for each model. This proxy model is a simple
    shallow copy of the original model being generated.
    """
    if cls.__proxy_model__:
        return cls.__proxy_model__

    fields = {key: copy.copy(field) for key, field in cls.meta.fields.items()}
    proxy_model = ProxyModel(
        name=cls.__name__,
        module=cls.__module__,
        metadata=cls.meta,
        definitions=fields,
    )

    proxy_model.build()
    generify_model_fields(proxy_model.model)
    return proxy_model.model

load_recursive async

load_recursive(only_needed=False, only_needed_nest=False, _seen=None)
PARAMETER DESCRIPTION
only_needed

TYPE: bool DEFAULT: False

only_needed_nest

TYPE: bool DEFAULT: False

_seen

TYPE: Optional[Set[Any]] DEFAULT: None

Source code in edgy/core/db/models/base.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
async def load_recursive(
    self,
    only_needed: bool = False,
    only_needed_nest: bool = False,
    _seen: Optional[Set[Any]] = None,
) -> None:
    if _seen is None:
        _seen = {self.create_model_key()}
    else:
        model_key = self.create_model_key()
        if model_key in _seen:
            return
        else:
            _seen.add(model_key)
    _loaded_or_deleted = self._loaded_or_deleted
    if self.can_load:
        await self.load(only_needed)
    if only_needed_nest and _loaded_or_deleted:
        return
    for field_name in self.meta.foreign_key_fields:
        value = getattr(self, field_name, None)
        if value is not None:
            # if a subinstance is fully loaded stop
            await value.load_recursive(
                only_needed=only_needed, only_needed_nest=True, _seen=_seen
            )

model_dump

model_dump(show_pk=None, **kwargs)

An updated version of the model dump. It can show the pk always and handles the exclude attribute on fields correctly and contains the custom logic for fields with getters

PARAMETER DESCRIPTION
show_pk

TYPE: Union[bool, None] DEFAULT: None

**kwargs

TYPE: Any DEFAULT: {}

Extra Args

show_pk: bool - Enforces showing the primary key in the model_dump.

Source code in edgy/core/db/models/base.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def model_dump(self, show_pk: Union[bool, None] = None, **kwargs: Any) -> Dict[str, Any]:
    """
    An updated version of the model dump.
    It can show the pk always and handles the exclude attribute on fields correctly and
    contains the custom logic for fields with getters

    Extra Args:
        show_pk: bool - Enforces showing the primary key in the model_dump.
    """
    # we want a copy
    exclude: Union[Set[str], Dict[str, Any], None] = kwargs.pop("exclude", None)
    if exclude is None:
        initial_full_field_exclude = _empty
        # must be writable
        exclude = set()
    elif isinstance(exclude, dict):
        initial_full_field_exclude = {k for k, v in exclude.items() if v is True}
        exclude = copy.copy(exclude)
    else:
        initial_full_field_exclude = set(exclude)
        exclude = copy.copy(initial_full_field_exclude)

    if isinstance(exclude, dict):
        exclude["__show_pk__"] = True
        for field_name in self.meta.excluded_fields:
            exclude[field_name] = True
    else:
        exclude.update(self.meta.special_getter_fields)
        exclude.update(self.meta.excluded_fields)
        exclude.add("__show_pk__")
    include: Union[Set[str], Dict[str, Any], None] = kwargs.pop("include", None)
    mode: Union[Literal["json", "python"], str] = kwargs.pop("mode", "python")

    should_show_pk = show_pk or self.__show_pk__
    model = dict(super().model_dump(exclude=exclude, include=include, mode=mode, **kwargs))
    # Workaround for metafields, computed field logic introduces many problems
    # so reimplement the logic here
    for field_name in self.meta.special_getter_fields:
        if field_name == "pk":
            continue
        if not should_show_pk or field_name not in self.pknames:
            if field_name in initial_full_field_exclude:
                continue
            if include is not None and field_name not in include:
                continue
            if getattr(field_name, "exclude", False):
                continue
        field: BaseFieldType = self.meta.fields[field_name]
        try:
            retval = field.__get__(self, self.__class__)
        except AttributeError:
            continue
        sub_include = None
        if isinstance(include, dict):
            sub_include = include.get(field_name, None)
            if sub_include is True:
                sub_include = None
        sub_exclude = None
        if isinstance(exclude, dict):
            sub_exclude = exclude.get(field_name, None)
            if sub_exclude is True:
                sub_exclude = None
        if isinstance(retval, BaseModel):
            retval = retval.model_dump(
                include=sub_include, exclude=sub_exclude, mode=mode, **kwargs
            )
        else:
            assert (
                sub_include is None
            ), "sub include filters for CompositeField specified, but no Pydantic model is set"
            assert (
                sub_exclude is None
            ), "sub exclude filters for CompositeField specified, but no Pydantic model is set"
            if mode == "json" and not getattr(field, "unsafe_json_serialization", False):
                # skip field if it isn't a BaseModel and the mode is json and unsafe_json_serialization is not set
                # currently unsafe_json_serialization exists only on CompositeFields
                continue
        alias: str = field_name
        if getattr(field, "serialization_alias", None):
            alias = cast(str, field.serialization_alias)
        elif getattr(field, "alias", None):
            alias = field.alias
        model[alias] = retval
    # tenants cause excluded fields to reappear
    # TODO: find a better bugfix
    for excluded_field in self.meta.excluded_fields:
        model.pop(excluded_field, None)
    return model

build classmethod

build(schema=None)

Builds the SQLAlchemy table representation from the loaded fields.

PARAMETER DESCRIPTION
schema

TYPE: Optional[str] DEFAULT: None

Source code in edgy/core/db/models/base.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
@classmethod
def build(cls, schema: Optional[str] = None) -> sqlalchemy.Table:
    """
    Builds the SQLAlchemy table representation from the loaded fields.
    """
    tablename: str = cls.meta.tablename  # type: ignore
    registry = cls.meta.registry
    assert registry is not None, "registry is not set"
    metadata: sqlalchemy.MetaData = cast("sqlalchemy.MetaData", registry._metadata)  # type: ignore
    metadata.schema = schema or registry.db_schema

    unique_together = cls.meta.unique_together
    index_constraints = cls.meta.indexes

    columns: List[sqlalchemy.Column] = []
    global_constraints: List[Any] = []
    for name, field in cls.meta.fields.items():
        current_columns = field.get_columns(name)
        columns.extend(current_columns)
        global_constraints.extend(field.get_global_constraints(name, current_columns))

    # Handle the uniqueness together
    uniques = []
    for field in unique_together or []:
        unique_constraint = cls._get_unique_constraints(field)
        uniques.append(unique_constraint)

    # Handle the indexes
    indexes = []
    for field in index_constraints or []:
        index = cls._get_indexes(field)
        indexes.append(index)
    return sqlalchemy.Table(
        tablename,
        metadata,
        *columns,
        *uniques,
        *indexes,
        *global_constraints,
        extend_existing=True,
    )

extract_db_fields

extract_db_fields(only=None)

Extracts all the db fields, model references and fields. Related fields are not included because they are disjoint.

PARAMETER DESCRIPTION
only

TYPE: Optional[Sequence[str]] DEFAULT: None

Source code in edgy/core/db/models/types.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def extract_db_fields(self, only: Optional[Sequence[str]] = None) -> Dict[str, Any]:
    """
    Extracts all the db fields, model references and fields.
    Related fields are not included because they are disjoint.
    """
    fields = self.meta.fields
    model_references = self.meta.model_references
    columns = self.table.columns

    if only is not None:
        return {k: v for k, v in self.__dict__.items() if k in only}

    return {
        k: v
        for k, v in self.__dict__.items()
        if k in fields or hasattr(columns, k) or k in model_references
    }

get_instance_name

get_instance_name()

Returns the name of the class in lowercase.

Source code in edgy/core/db/models/types.py
153
154
155
156
157
def get_instance_name(self) -> str:
    """
    Returns the name of the class in lowercase.
    """
    return self.__class__.__name__.lower()

create_model_key

create_model_key()

Build a cache key for the model.

Source code in edgy/core/db/models/types.py
159
160
161
162
163
164
165
166
167
def create_model_key(self) -> tuple:
    """
    Build a cache key for the model.
    """
    pk_key_list: List[Any] = [type(self).__name__]
    # there are no columns, only column results
    for attr in self.pkcolumns:
        pk_key_list.append(str(getattr(self, attr)))
    return tuple(pk_key_list)

extract_model_references

extract_model_references(extracted_values, model_class=None)

Exracts any possible model references from the EdgyModel and returns a dictionary.

PARAMETER DESCRIPTION
extracted_values

TYPE: Any

model_class

TYPE: Optional[Type[BaseFieldType]] DEFAULT: None

Source code in edgy/core/db/models/mixins/model_parser.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
def extract_model_references(
    self, extracted_values: Any, model_class: Optional[Type["BaseFieldType"]] = None
) -> Any:
    """
    Exracts any possible model references from the EdgyModel and returns a dictionary.
    """
    model_cls = model_class or self
    model_references = {
        name: extracted_values.get(name, None)
        for name in model_cls.meta.model_references  # type: ignore
        if extracted_values.get(name)
    }
    return model_references

extract_column_values

extract_column_values(extracted_values, model_class=None, is_update=False, is_partial=False)

Extracts all the default values from the given fields and returns the raw value corresponding to each field.

Extract the model references.

PARAMETER DESCRIPTION
extracted_values

TYPE: Any

model_class

TYPE: Optional[Model] DEFAULT: None

is_update

TYPE: bool DEFAULT: False

is_partial

TYPE: bool DEFAULT: False

Source code in edgy/core/db/models/mixins/model_parser.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def extract_column_values(
    self,
    extracted_values: Any,
    model_class: Optional["Model"] = None,
    is_update: bool = False,
    is_partial: bool = False,
) -> Dict[str, Any]:
    """
    Extracts all the default values from the given fields and returns the raw
    value corresponding to each field.

    Extract the model references.
    """
    model_cls = model_class or self
    validated: Dict[str, Any] = {}
    # phase 1: transform when required
    if model_cls.meta.input_modifying_fields:
        extracted_values = {**extracted_values}
        for field_name in model_cls.meta.input_modifying_fields:
            model_cls.meta.fields[field_name].modify_input(field_name, extracted_values)
    # phase 2: validate fields and set defaults for readonly
    need_second_pass: List[BaseFieldType] = []
    for field_name, field in model_cls.meta.fields.items():
        if (
            not is_partial or (field.inject_default_on_partial_update and is_update)
        ) and field.read_only:
            if field.has_default():
                validated.update(
                    field.get_default_values(field_name, validated, is_update=is_update)
                )
            continue
        if field_name in extracted_values:
            item = extracted_values[field_name]
            for sub_name, value in field.clean(field_name, item).items():
                if sub_name in validated:
                    raise ValueError(f"value set twice for key: {sub_name}")
                validated[sub_name] = value
        elif (
            not is_partial or (field.inject_default_on_partial_update and is_update)
        ) and field.has_default():
            # add field without a value to the second pass (in case no value appears)
            # only include fields which have inject_default_on_partial_update set or if not is_partial
            need_second_pass.append(field)

    # phase 3: set defaults for the rest if not partial or inject_default_on_partial_update
    if need_second_pass:
        for field in need_second_pass:
            # check if field appeared e.g. by composite
            if field.name not in validated:
                validated.update(
                    field.get_default_values(field.name, validated, is_update=is_update)
                )
    return validated

transform_input classmethod

transform_input(kwargs, phase)

Expand to_model.

PARAMETER DESCRIPTION
kwargs

TYPE: Any

phase

TYPE: str

Source code in edgy/core/db/models/base.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@classmethod
def transform_input(cls, kwargs: Any, phase: str) -> Any:
    """
    Expand to_model.
    """
    kwargs = {**kwargs}
    new_kwargs: Dict[str, Any] = {}

    fields = cls.meta.fields
    # phase 1: transform
    for field_name in cls.meta.input_modifying_fields:
        fields[field_name].modify_input(field_name, kwargs)
    # phase 2: apply to_model
    for key, value in kwargs.items():
        field = fields.get(key, None)
        if field is not None:
            new_kwargs.update(**field.to_model(key, value, phase=phase))
        else:
            new_kwargs[key] = value
    return new_kwargs

setup_model_from_kwargs

setup_model_from_kwargs(kwargs)

Loops and setup the kwargs of the model

PARAMETER DESCRIPTION
kwargs

TYPE: Any

Source code in edgy/core/db/models/base.py
 95
 96
 97
 98
 99
100
101
102
103
104
def setup_model_from_kwargs(self, kwargs: Any) -> Any:
    """
    Loops and setup the kwargs of the model
    """

    return {
        k: v
        for k, v in kwargs.items()
        if k in self.meta.fields or k in self.meta.model_references
    }

_get_unique_constraints classmethod

_get_unique_constraints(columns)

Returns the unique constraints for the model.

The columns must be a a list, tuple of strings or a UniqueConstraint object.

:return: Model UniqueConstraint.

PARAMETER DESCRIPTION
columns

TYPE: Union[Sequence, str, UniqueConstraint]

Source code in edgy/core/db/models/base.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
@classmethod
def _get_unique_constraints(
    cls, columns: Union[Sequence, str, sqlalchemy.UniqueConstraint]
) -> Optional[sqlalchemy.UniqueConstraint]:
    """
    Returns the unique constraints for the model.

    The columns must be a a list, tuple of strings or a UniqueConstraint object.

    :return: Model UniqueConstraint.
    """
    if isinstance(columns, str):
        return sqlalchemy.UniqueConstraint(columns)
    elif isinstance(columns, UniqueConstraint):
        return sqlalchemy.UniqueConstraint(*columns.fields, name=columns.name)
    return sqlalchemy.UniqueConstraint(*columns)

_get_indexes classmethod

_get_indexes(index)

Creates the index based on the Index fields

PARAMETER DESCRIPTION
index

TYPE: Index

Source code in edgy/core/db/models/base.py
397
398
399
400
401
402
@classmethod
def _get_indexes(cls, index: Index) -> Optional[sqlalchemy.Index]:
    """
    Creates the index based on the Index fields
    """
    return sqlalchemy.Index(index.name, *index.fields)  # type: ignore

__setattr__

__setattr__(key, value)
PARAMETER DESCRIPTION
key

TYPE: str

value

TYPE: Any

Source code in edgy/core/db/models/base.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def __setattr__(self, key: str, value: Any) -> None:
    fields = self.meta.fields
    field = fields.get(key, None)
    if field is not None:
        if hasattr(field, "__set__"):
            # not recommended, better to use to_model instead
            # used in related_fields to mask and not to implement to_model
            field.__set__(self, value)
        else:
            for k, v in field.to_model(key, value, phase="set").items():
                # bypass __settr__
                edgy_setattr(self, k, v)
    else:
        # bypass __settr__
        edgy_setattr(self, key, value)

_agetattr_helper async

_agetattr_helper(name)
PARAMETER DESCRIPTION
name

TYPE: str

Source code in edgy/core/db/models/base.py
420
421
422
async def _agetattr_helper(self, name: str) -> Any:
    await self.load()
    return self.__dict__[name]

__getattr__

__getattr__(name)

Does following things 1. Initialize managers on access 2. Redirects get accesses to getter fields 3. Run an one off query to populate any foreign key making sure it runs only once per foreign key avoiding multiple database calls.

PARAMETER DESCRIPTION
name

TYPE: str

Source code in edgy/core/db/models/base.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def __getattr__(self, name: str) -> Any:
    """
    Does following things
    1. Initialize managers on access
    2. Redirects get accesses to getter fields
    3. Run an one off query to populate any foreign key making sure
       it runs only once per foreign key avoiding multiple database calls.
    """
    return_load_coro_on_attr_access = self._return_load_coro_on_attr_access
    # unset flag
    self._return_load_coro_on_attr_access = False
    manager = self.meta.managers.get(name)
    if manager is not None:
        if name not in self.__dict__:
            manager = copy.copy(manager)
            manager.instance = self
            self.__dict__[name] = manager
        return self.__dict__[name]

    field = self.meta.fields.get(name)
    if field is not None and hasattr(field, "__get__"):
        # no need to set an descriptor object
        return field.__get__(self, self.__class__)
    if (
        name not in self.__dict__
        and not self._loaded_or_deleted
        and field is not None
        and name not in self.identifying_db_fields
        and self.can_load
    ):
        coro = self._agetattr_helper(name)
        if return_load_coro_on_attr_access:
            return coro
        return run_sync(coro)
    return super().__getattr__(name)

__eq__

__eq__(other)
PARAMETER DESCRIPTION
other

TYPE: Any

Source code in edgy/core/db/models/base.py
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def __eq__(self, other: Any) -> bool:
    # if self.__class__ != other.__class__:
    #     return False
    # somehow meta gets regenerated, so just compare tablename and registry.
    if self.meta.registry is not other.meta.registry:
        return False
    if self.meta.tablename != other.meta.tablename:
        return False
    self_dict = self.extract_column_values(
        self.extract_db_fields(self.pkcolumns), is_partial=True
    )
    other_dict = self.extract_column_values(
        other.extract_db_fields(self.pkcolumns), is_partial=True
    )
    key_set = {*self_dict.keys(), *other_dict.keys()}
    for field_name in key_set:
        if self_dict.get(field_name) != other_dict.get(field_name):
            return False
    return True

declarative classmethod

declarative()
Source code in edgy/core/db/models/mixins/generics.py
12
13
14
@classmethod
def declarative(cls) -> Any:
    return cls.generate_model_declarative()

generate_model_declarative classmethod

generate_model_declarative()

Transforms a core Edgy table into a Declarative model table.

Source code in edgy/core/db/models/mixins/generics.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@classmethod
def generate_model_declarative(cls) -> Any:
    """
    Transforms a core Edgy table into a Declarative model table.
    """
    Base = cls.meta.registry.declarative_base

    # Build the original table
    fields = {"__table__": cls.table}

    # Generate base
    model_table = type(cls.__name__, (Base,), fields)

    # Make sure if there are foreignkeys, builds the relationships
    for column in cls.table.columns:
        if not column.foreign_keys:
            continue

        # Maps the relationships with the foreign keys and related names
        field = cls.meta.fields.get(column.name)
        to = field.to.__name__ if inspect.isclass(field.to) else field.to
        mapped_model: Mapped[to] = relationship(to)  # type: ignore

        # Adds to the current model
        model_table.__mapper__.add_property(f"{column.name}_relation", mapped_model)

    return model_table

from_sqla_row async classmethod

from_sqla_row(row, select_related=None, prefetch_related=None, is_only_fields=False, only_fields=None, is_defer_fields=False, exclude_secrets=False, using_schema=None)

Class method to convert a SQLAlchemy Row result into a EdgyModel row type.

Looping through select_related fields if the query comes from a select_related operation. Validates if exists the select_related and related_field inside the models.

When select_related and related_field exist for the same field being validated, the related field is ignored as it won't override the value already collected from the select_related.

If there is no select_related, then goes through the related field where it should only return the instance of the the ForeignKey with the ID, making it lazy loaded.

:return: Model class.

PARAMETER DESCRIPTION
row

TYPE: Row

select_related

TYPE: Optional[Sequence[Any]] DEFAULT: None

prefetch_related

TYPE: Optional[Sequence[Prefetch]] DEFAULT: None

is_only_fields

TYPE: bool DEFAULT: False

only_fields

TYPE: Sequence[str] DEFAULT: None

is_defer_fields

TYPE: bool DEFAULT: False

exclude_secrets

TYPE: bool DEFAULT: False

using_schema

TYPE: Union[str, None] DEFAULT: None

Source code in edgy/core/db/models/mixins/row.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@classmethod
async def from_sqla_row(
    cls,
    row: "Row",
    select_related: Optional[Sequence[Any]] = None,
    prefetch_related: Optional[Sequence["Prefetch"]] = None,
    is_only_fields: bool = False,
    only_fields: Sequence[str] = None,
    is_defer_fields: bool = False,
    exclude_secrets: bool = False,
    using_schema: Union[str, None] = None,
) -> Optional["Model"]:
    """
    Class method to convert a SQLAlchemy Row result into a EdgyModel row type.

    Looping through select_related fields if the query comes from a select_related operation.
    Validates if exists the select_related and related_field inside the models.

    When select_related and related_field exist for the same field being validated, the related
    field is ignored as it won't override the value already collected from the select_related.

    If there is no select_related, then goes through the related field where it **should**
    only return the instance of the the ForeignKey with the ID, making it lazy loaded.

    :return: Model class.
    """
    item: Dict[str, Any] = {}
    select_related = select_related or []
    prefetch_related = prefetch_related or []
    secret_columns = set()
    if exclude_secrets:
        for name in cls.meta.secret_fields:
            secret_columns.update(cls.meta.field_to_column_names[name])

    for related in select_related:
        field_name = related.split("__", 1)[0]
        try:
            field = cls.meta.fields[field_name]
        except KeyError:
            raise QuerySetError(
                detail=f'Selected field "{field_name}" does not exist on {cls}.'
            ) from None
        if isinstance(field, RelationshipField):
            model_class, _, remainder = field.traverse_field(related)
        else:
            raise QuerySetError(
                detail=f'Selected field "{field_name}" is not a RelationshipField on {cls}.'
            ) from None
        if remainder:
            item[field_name] = await model_class.from_sqla_row(
                row,
                select_related=[remainder],
                prefetch_related=prefetch_related,
                exclude_secrets=exclude_secrets,
                using_schema=using_schema,
            )
        else:
            item[field_name] = await model_class.from_sqla_row(
                row, exclude_secrets=exclude_secrets, using_schema=using_schema
            )
    # Populate the related names
    # Making sure if the model being queried is not inside a select related
    # This way it is not overritten by any value
    for related, foreign_key in cls.meta.foreign_key_fields.items():
        ignore_related: bool = cls.__should_ignore_related_name(related, select_related)
        if ignore_related or related in cls.meta.secret_fields:
            continue
        if related in item:
            continue

        if exclude_secrets and foreign_key.secret:
            continue
        columns_to_check = foreign_key.get_column_names(related)

        model_related = foreign_key.target

        # Apply the schema to the model
        model_related = cls.__apply_schema(model_related, using_schema)

        child_item = {}
        for column_name in columns_to_check:
            column = getattr(cls.table.columns, column_name, None)
            if column is not None and column in row._mapping:
                child_item[foreign_key.from_fk_field_name(related, column_name)] = (
                    row._mapping[column]
                )
        # Make sure we generate a temporary reduced model
        # For the related fields. We simply chnage the structure of the model
        # and rebuild it with the new fields.
        proxy_model = model_related.proxy_model(**child_item)
        proxy_model.identifying_db_fields = foreign_key.related_columns
        item[related] = proxy_model

    # Check for the only_fields
    _is_only = set()
    if is_only_fields:
        _is_only = {str(field) for field in (only_fields or row._mapping.keys())}
    # Pull out the regular column values.
    for column in cls.table.columns:
        # Making sure when a table is reflected, maps the right fields of the ReflectModel
        if _is_only and column.name not in _is_only:
            continue
        if column.name in secret_columns:
            continue
        if column.name not in cls.meta.columns_to_field:
            continue
        # set if not of an foreign key
        elif cls.meta.columns_to_field[column.name] not in item:
            if column in row._mapping:
                item[column.name] = row._mapping[column]
            # fallback if first is not working
            elif column.name in row._mapping:
                item[column.name] = row._mapping[column.name]
    model = (
        cast("Model", cls(**item))
        if not exclude_secrets and not is_defer_fields and not _is_only
        else cast("Model", cls.proxy_model(**item))
    )
    # Apply the schema to the model
    model = cls.__apply_schema(model, using_schema)

    # Handle prefetch related fields.
    await cls.__handle_prefetch_related(
        row=row, model=model, prefetch_related=prefetch_related
    )
    assert model.pk is not None
    return model

__apply_schema classmethod

__apply_schema(model, schema=None)
PARAMETER DESCRIPTION
model

TYPE: Model

schema

TYPE: Optional[str] DEFAULT: None

Source code in edgy/core/db/models/mixins/row.py
148
149
150
151
152
153
154
@classmethod
def __apply_schema(cls, model: "Model", schema: Optional[str] = None) -> "Model":
    # Apply the schema to the model
    if schema is not None:
        model.table = model.build(schema)
        model.proxy_model.table = model.proxy_model.build(schema)
    return model
__should_ignore_related_name(related_name, select_related)

Validates if it should populate the related field if select related is not considered.

PARAMETER DESCRIPTION
related_name

TYPE: str

select_related

TYPE: Sequence[str]

Source code in edgy/core/db/models/mixins/row.py
156
157
158
159
160
161
162
163
164
165
166
167
@classmethod
def __should_ignore_related_name(
    cls, related_name: str, select_related: Sequence[str]
) -> bool:
    """
    Validates if it should populate the related field if select related is not considered.
    """
    for related_field in select_related:
        fields = related_field.split("__")
        if related_name in fields:
            return True
    return False

create_model_key_from_sqla_row classmethod

create_model_key_from_sqla_row(row)

Build a cache key for the model.

PARAMETER DESCRIPTION
row

TYPE: Row

Source code in edgy/core/db/models/mixins/row.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@classmethod
def create_model_key_from_sqla_row(
    cls,
    row: "Row",
) -> tuple:
    """
    Build a cache key for the model.
    """
    pk_key_list: List[Any] = [cls.__name__]
    for attr in cls.pkcolumns:
        try:
            pk_key_list.append(str(row._mapping[getattr(cls.table.columns, attr)]))
        except KeyError:
            pk_key_list.append(str(row._mapping[attr]))
    return tuple(pk_key_list)

__set_prefetch async classmethod

__set_prefetch(row, model, related)
PARAMETER DESCRIPTION
row

TYPE: Row

model

TYPE: Model

related

TYPE: Prefetch

Source code in edgy/core/db/models/mixins/row.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@classmethod
async def __set_prefetch(cls, row: "Row", model: "Model", related: "Prefetch") -> None:
    model_key = ()
    if related._is_finished:
        await related.init_bake(type(model))
        model_key = model.create_model_key()
    if model_key in related._baked_results:
        setattr(model, related.to_attr, related._baked_results[model_key])
    else:
        clauses = []
        for pkcol in cls.pkcolumns:
            clauses.append(getattr(model.table.columns, pkcol) == getattr(row, pkcol))
        queryset = related.queryset
        if related._is_finished:
            assert queryset is not None, "Queryset is not set but _is_finished flag"
        else:
            check_prefetch_collision(model, related)
            crawl_result = crawl_relationship(
                model.__class__, related.related_name, traverse_last=True
            )
            if queryset is None:
                if crawl_result.reverse_path is False:
                    queryset = model.__class__.query.all()
                else:
                    queryset = crawl_result.model_class.query.all()

            if queryset.model_class == model.__class__:
                # queryset is of this model
                queryset = queryset.select_related(related.related_name)
                queryset.embed_parent = (related.related_name, "")
            elif crawl_result.reverse_path is False:
                QuerySetError(
                    detail=(
                        f"Creating a reverse path is not possible, unidirectional fields used."
                        f"You may want to use as queryset a queryset of model class {model!r}."
                    )
                )
            else:
                # queryset is of the target model
                queryset = queryset.select_related(crawl_result.reverse_path)
        setattr(model, related.to_attr, await queryset.filter(*clauses))
__handle_prefetch_related(row, model, prefetch_related)

Handles any prefetch related scenario from the model. Loads in advance all the models needed for a specific record

Recursively checks for the related field and validates if there is any conflicting attribute. If there is, a QuerySetError is raised.

PARAMETER DESCRIPTION
row

TYPE: Row

model

TYPE: Model

prefetch_related

TYPE: Sequence[Prefetch]

Source code in edgy/core/db/models/mixins/row.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
@classmethod
async def __handle_prefetch_related(
    cls,
    row: "Row",
    model: "Model",
    prefetch_related: Sequence["Prefetch"],
) -> None:
    """
    Handles any prefetch related scenario from the model.
    Loads in advance all the models needed for a specific record

    Recursively checks for the related field and validates if there is any conflicting
    attribute. If there is, a `QuerySetError` is raised.
    """

    queries = []

    for related in prefetch_related:
        # Check for conflicting names
        # Check as early as possible
        check_prefetch_collision(model=model, related=related)
        queries.append(cls.__set_prefetch(row=row, model=model, related=related))
    if queries:
        await asyncio.gather(*queries)

update async

update(**kwargs)

Update operation of the database fields.

PARAMETER DESCRIPTION
**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/model.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async def update(self, **kwargs: Any) -> Any:
    """
    Update operation of the database fields.
    """
    await self.meta.signals.pre_update.send_async(self.__class__, instance=self)

    # empty updates shouldn't cause an error
    if kwargs:
        kwargs = self.extract_column_values(
            extracted_values=kwargs, is_partial=True, is_update=True
        )
        expression = self.table.update().values(**kwargs).where(*self.identifying_clauses())
        await self.database.execute(expression)
    await self.meta.signals.post_update.send_async(self.__class__, instance=self)

    # Update the model instance.
    for key, value in kwargs.items():
        setattr(self, key, value)

    for field in self.meta.fields:
        _val = self.__dict__.get(field)
        if isinstance(_val, ManyRelationProtocol):
            _val.instance = self
            await _val.save_related()
    return self

delete async

delete()

Delete operation from the database

Source code in edgy/core/db/models/model.py
72
73
74
75
76
77
78
79
80
81
async def delete(self) -> None:
    """Delete operation from the database"""
    await self.meta.signals.pre_delete.send_async(self.__class__, instance=self)

    expression = self.table.delete().where(*self.identifying_clauses())
    await self.database.execute(expression)
    # we cannot load anymore
    self._loaded_or_deleted = True

    await self.meta.signals.post_delete.send_async(self.__class__, instance=self)

load async

load(only_needed=False)
PARAMETER DESCRIPTION
only_needed

TYPE: bool DEFAULT: False

Source code in edgy/core/db/models/model.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
async def load(self, only_needed: bool = False) -> None:
    if only_needed and self._loaded_or_deleted:
        return
    # Build the select expression.
    expression = self.table.select().where(*self.identifying_clauses())

    # Perform the fetch.
    row = await self.database.fetch_one(expression)
    # check if is in system
    if row is None:
        raise ObjectNotFound("row does not exist anymore")
    # Update the instance.
    self.__dict__.update(self.transform_input(dict(row._mapping), phase="load"))
    self._loaded_or_deleted = True

_save async

_save(**kwargs)

Performs the save instruction.

PARAMETER DESCRIPTION
**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/model.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
async def _save(self, **kwargs: Any) -> "Model":
    """
    Performs the save instruction.
    """
    expression = self.table.insert().values(**kwargs)
    autoincrement_value = await self.database.execute(expression)
    transformed_kwargs = self.transform_input(kwargs, phase="post_insert")
    for k, v in transformed_kwargs.items():
        setattr(self, k, v)
    # sqlalchemy supports only one autoincrement column
    if autoincrement_value:
        column = self.table.autoincrement_column
        if column is not None and isinstance(autoincrement_value, Row):
            autoincrement_value = autoincrement_value._mapping[column.name]
        # can be explicit set, which causes an invalid value returned
        if column is not None and column.key not in kwargs:
            setattr(self, column.key, autoincrement_value)
    for field in self.meta.fields:
        _val = self.__dict__.get(field)
        if isinstance(_val, ManyRelationProtocol):
            _val.instance = self
            await _val.save_related()
    return self

save_model_references async

save_model_references(model_references, model_ref=None)

If there is any ModelRef declared in the model, it will generate the subsquent model reference records for that same model created.

PARAMETER DESCRIPTION
model_references

TYPE: Any

model_ref

TYPE: Any DEFAULT: None

Source code in edgy/core/db/models/model.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
async def save_model_references(self, model_references: Any, model_ref: Any = None) -> None:
    """
    If there is any ModelRef declared in the model, it will generate the subsquent model
    reference records for that same model created.
    """

    for reference in model_references:
        if isinstance(reference, dict):
            model: Type[Model] = self.meta.model_references[model_ref].__model__  # type: ignore
        else:
            model: Type[Model] = reference.__model__  # type: ignore

        if isinstance(model, str):
            model = self.meta.registry.models[model]  # type: ignore

        # If the reference did come in a dict format
        # It is necessary to convert into the original ModelRef.
        if isinstance(reference, dict):
            reference = self.meta.model_references[model_ref](**reference)  # type: ignore

        foreign_key_target_field = None
        for name, foreign_key in model.meta.foreign_key_fields.items():
            if foreign_key.target == self.__class__:
                foreign_key_target_field = name

        if not foreign_key_target_field:
            raise RelationshipNotFound(
                f"There was no relationship found between '{model.__class__.__name__}' and {self.__class__.__name__}"
            )

        data = reference.model_dump(exclude={"__model__"})
        data[foreign_key_target_field] = self
        await model.query.create(**data)

save async

save(force_save=False, values=None, **kwargs)

Performs a save of a given model instance. When creating a user it will make sure it can update existing or create a new one.

PARAMETER DESCRIPTION
force_save

TYPE: bool DEFAULT: False

values

TYPE: Dict[str, Any] DEFAULT: None

**kwargs

TYPE: Any DEFAULT: {}

Source code in edgy/core/db/models/model.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
async def save(
    self,
    force_save: bool = False,
    values: Dict[str, Any] = None,
    **kwargs: Any,
) -> Union["Model", Any]:
    """
    Performs a save of a given model instance.
    When creating a user it will make sure it can update existing or
    create a new one.
    """
    await self.meta.signals.pre_save.send_async(self.__class__, instance=self)

    extracted_fields = self.extract_db_fields()

    for pkcolumn in self.__class__.pkcolumns:
        # should trigger load in case of identifying_db_fields
        if (
            getattr(self, pkcolumn, None) is None
            and self.table.columns[pkcolumn].autoincrement
        ):
            extracted_fields.pop(pkcolumn, None)
            force_save = True

    if force_save:
        if values:
            extracted_fields.update(values)
        # force save must ensure a complete mapping
        kwargs = self.extract_column_values(
            extracted_values=extracted_fields, is_partial=False, is_update=False
        )
        model_references = self.extract_model_references(extracted_fields)
        await self._save(**kwargs)
    else:
        # Broadcast the initial update details
        # Making sure it only updates the fields that should be updated
        # and excludes the fields with `auto_now` as true
        kwargs = self.extract_column_values(
            extracted_values=extracted_fields if values is None else values,
            is_update=True,
            is_partial=values is not None,
        )
        model_references = self.extract_model_references(
            extracted_fields if values is None else values
        )

        await self.meta.signals.pre_update.send_async(
            self.__class__, instance=self, kwargs=kwargs
        )
        await self.update(**kwargs)

        # Broadcast the update complete
        await self.meta.signals.post_update.send_async(self.__class__, instance=self)

    # Save the model references
    if model_references:
        for model_ref, references in model_references.items():
            await self.save_model_references(references or [], model_ref=model_ref)

    # Ensure on access refresh the results is active
    self._loaded_or_deleted = False

    await self.meta.signals.post_save.send_async(self.__class__, instance=self)
    return self