Skip to content

Schema class

edgy.core.connection.schemas.Schema

Schema(registry)

All the schema operations object.

All the operations regarding a schema are placed in one object

PARAMETER DESCRIPTION
registry

TYPE: Registry

Source code in edgy/core/connection/schemas.py
25
26
def __init__(self, registry: "Registry") -> None:
    self.registry = registry

_default_schema instance-attribute

_default_schema

registry instance-attribute

registry = registry

database property

database

get_default_schema

get_default_schema()

Returns the default schema which is usually None

Source code in edgy/core/connection/schemas.py
32
33
34
35
36
37
38
def get_default_schema(self) -> Optional[str]:
    """
    Returns the default schema which is usually None
    """
    if not hasattr(self, "_default_schema"):
        self._default_schema = self.database.url.sqla_url.get_dialect(True).default_schema_name
    return self._default_schema

activate_schema_path async

activate_schema_path(database, schema, is_shared=True)
PARAMETER DESCRIPTION
database

TYPE: Database

schema

TYPE: str

is_shared

TYPE: bool DEFAULT: True

Source code in edgy/core/connection/schemas.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def activate_schema_path(
    self, database: Database, schema: str, is_shared: bool = True
) -> None:
    # INSECURE, but not used by default. Add warning
    # TODO: remove when there are no users of the method
    warnings.warn(
        "`activate_schema_path` is dangerous because the schema is not properly escaped and deprecated.",
        DeprecationWarning,
        stacklevel=2,
    )
    path = (
        f"SET search_path TO {schema}, shared;"
        if is_shared
        else f"SET search_path TO {schema};"
    )
    expression = sqlalchemy.text(path)
    await database.execute(expression)

create_schema async

create_schema(schema, if_not_exists=False, init_models=False, init_tenant_models=False, update_cache=True, databases=(None))

Creates a model schema if it does not exist.

PARAMETER DESCRIPTION
schema

TYPE: str

if_not_exists

TYPE: bool DEFAULT: False

init_models

TYPE: bool DEFAULT: False

init_tenant_models

TYPE: bool DEFAULT: False

update_cache

TYPE: bool DEFAULT: True

databases

TYPE: Sequence[Union[str, None]] DEFAULT: (None)

Source code in edgy/core/connection/schemas.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
async def create_schema(
    self,
    schema: str,
    if_not_exists: bool = False,
    init_models: bool = False,
    init_tenant_models: bool = False,
    update_cache: bool = True,
    databases: Sequence[Union[str, None]] = (None,),
) -> None:
    """
    Creates a model schema if it does not exist.
    """
    tenant_tables: list[sqlalchemy.Table] = []
    if init_models:
        for model_class in self.registry.models.values():
            model_class.table_schema(schema=schema, update_cache=update_cache)
    if init_tenant_models and init_models:
        for model_class in self.registry.tenant_models.values():
            model_class.table_schema(schema=schema, update_cache=update_cache)
    elif init_tenant_models:
        for model_class in self.registry.tenant_models.values():
            tenant_tables.append(
                model_class.table_schema(schema=schema, update_cache=update_cache)
            )

    def execute_create(connection: sqlalchemy.Connection, name: Optional[str]) -> None:
        try:
            connection.execute(
                sqlalchemy.schema.CreateSchema(name=schema, if_not_exists=if_not_exists)
            )
        except ProgrammingError as e:
            raise SchemaError(detail=e.orig.args[0]) from e
        for table in tenant_tables:
            table.create(connection, checkfirst=if_not_exists)
        if init_models:
            self.registry.metadata_by_name[name].create_all(
                connection, checkfirst=if_not_exists
            )

    ops = []
    for database_name in databases:
        db = (
            self.registry.database
            if database_name is None
            else self.registry.extra[database_name]
        )
        # don't warn here about inperformance
        async with db as db:
            with db.force_rollback(False):
                ops.append(db.run_sync(execute_create, database_name))
    await asyncio.gather(*ops)

drop_schema async

drop_schema(schema, cascade=False, if_exists=False, databases=(None))

Drops an existing model schema.

PARAMETER DESCRIPTION
schema

TYPE: str

cascade

TYPE: bool DEFAULT: False

if_exists

TYPE: bool DEFAULT: False

databases

TYPE: Sequence[Union[str, None]] DEFAULT: (None)

Source code in edgy/core/connection/schemas.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
async def drop_schema(
    self,
    schema: str,
    cascade: bool = False,
    if_exists: bool = False,
    databases: Sequence[Union[str, None]] = (None,),
) -> None:
    """
    Drops an existing model schema.
    """

    def execute_drop(connection: sqlalchemy.Connection) -> None:
        try:
            connection.execute(
                sqlalchemy.schema.DropSchema(name=schema, cascade=cascade, if_exists=if_exists)
            )
        except DBAPIError as e:
            raise SchemaError(detail=e.orig.args[0]) from e

    ops = []

    for database_name in databases:
        db = (
            self.registry.database
            if database_name is None
            else self.registry.extra[database_name]
        )
        # don't warn here about inperformance
        async with db as db:
            with db.force_rollback(False):
                ops.append(db.run_sync(execute_drop))
    await asyncio.gather(*ops)

get_metadata_of_all_schemes async

get_metadata_of_all_schemes(database, *, no_reflect=False)
PARAMETER DESCRIPTION
database

TYPE: Database

no_reflect

TYPE: bool DEFAULT: False

Source code in edgy/core/connection/schemas.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
async def get_metadata_of_all_schemes(
    self, database: Database, *, no_reflect: bool = False
) -> tuple[sqlalchemy.MetaData, list[str]]:
    tablenames = self.registry.get_tablenames()

    async with database as database:
        list_schemes: list[str] = []
        metadata = sqlalchemy.MetaData()

        def wrapper(connection: sqlalchemy.Connection) -> None:
            nonlocal list_schemes
            inspector = sqlalchemy.inspect(connection)
            default_schema_name = inspector.default_schema_name
            list_schemes = [
                "" if default_schema_name == schema else schema
                for schema in inspector.get_schema_names()
            ]
            if not no_reflect:
                for schema in list_schemes:
                    metadata.reflect(
                        connection, schema=schema, only=lambda name, _: name in tablenames
                    )

        await database.run_sync(wrapper)
        return metadata, list_schemes

get_schemes_tree async

get_schemes_tree(*, no_reflect=False)
PARAMETER DESCRIPTION
no_reflect

TYPE: bool DEFAULT: False

Source code in edgy/core/connection/schemas.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
async def get_schemes_tree(
    self, *, no_reflect: bool = False
) -> dict[Union[str, None], tuple[str, sqlalchemy.MetaData, list[str]]]:
    schemes_tree: dict[Union[str, None], tuple[str, sqlalchemy.MetaData, list[str]]] = {
        None: (
            str(self.database.url),
            *(
                await self.get_metadata_of_all_schemes(
                    self.registry.database, no_reflect=no_reflect
                )
            ),
        )
    }
    for key, val in self.registry.extra.items():
        schemes_tree[key] = (
            str(val.url),
            *(await self.get_metadata_of_all_schemes(val, no_reflect=no_reflect)),
        )
    return schemes_tree