Skip to content

Schema class

edgy.core.connection.schemas.Schema

Schema(registry)

Manages all schema-related operations within the Edgy framework.

This class encapsulates functionalities for creating, dropping, and introspecting database schemas, ensuring proper model integration within multi-tenant or schema-isolated environments.

Initializes the Schema manager with a given registry.

PARAMETER DESCRIPTION
registry

The Edgy registry instance, providing access to models, databases, and other core components.

TYPE: Registry

Source code in edgy/core/connection/schemas.py
27
28
29
30
31
32
33
34
35
def __init__(self, registry: "Registry") -> None:
    """
    Initializes the Schema manager with a given registry.

    Args:
        registry: The Edgy registry instance, providing access to
                  models, databases, and other core components.
    """
    self.registry = registry

_default_schema instance-attribute

_default_schema

registry instance-attribute

registry = registry

database property

database

Provides direct access to the default database configured in the registry.

RETURNS DESCRIPTION
Database

The default database instance from the registry.

get_default_schema

get_default_schema()

Retrieves the default schema name from the underlying database dialect.

This method caches the default schema name after its first retrieval to optimize subsequent calls.

RETURNS DESCRIPTION
str | None

The name of the default schema, or None if not applicable or found.

Source code in edgy/core/connection/schemas.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def get_default_schema(self) -> str | None:
    """
    Retrieves the default schema name from the underlying database dialect.

    This method caches the default schema name after its first retrieval
    to optimize subsequent calls.

    Returns:
        The name of the default schema, or None if not applicable or found.
    """
    # Check if the _default_schema attribute has already been set.
    if not hasattr(self, "_default_schema"):
        # If not set, retrieve the default schema name from the database URL's
        # SQLAlchemy dialect and store it.
        self._default_schema = self.database.url.sqla_url.get_dialect(True).default_schema_name
    # Return the cached default schema name.
    return self._default_schema

create_schema async

create_schema(schema, if_not_exists=False, init_models=False, init_tenant_models=False, update_cache=True, databases=(None,))

Creates a new database schema and optionally initializes models within it.

This method handles the creation of a new schema and can populate it with tables for both regular models and tenant-specific models, respecting global field constraints.

PARAMETER DESCRIPTION
schema

The name of the schema to be created or None for main.

TYPE: str | None

if_not_exists

If True, the schema will only be created if it does not already exist, preventing an error. Defaults to False.

TYPE: bool DEFAULT: False

init_models

If True, all models registered with the registry will have their tables created within the new schema. Defaults to False.

TYPE: bool DEFAULT: False

init_tenant_models

If True, tenant-specific models will have their tables created within the new schema. This operation temporarily bypasses global field constraints. Defaults to False.

TYPE: bool DEFAULT: False

update_cache

If True, the model's schema cache will be updated. Defaults to True.

TYPE: bool DEFAULT: True

databases

A sequence of database names (keys from registry.extra) or None for the default database, on which the schema should be created. Defaults to (None,), meaning only the default database.

TYPE: Sequence[str | None] DEFAULT: (None,)

Raises: SchemaError: If there is an issue during schema creation or table initialization within the schema.

Source code in edgy/core/connection/schemas.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
async def create_schema(
    self,
    schema: str | None,
    if_not_exists: bool = False,
    init_models: bool = False,
    init_tenant_models: bool = False,
    update_cache: bool = True,
    databases: Sequence[str | None] = (None,),
) -> None:
    """
    Creates a new database schema and optionally initializes models within it.

    This method handles the creation of a new schema and can populate it
    with tables for both regular models and tenant-specific models,
    respecting global field constraints.

    Args:
        schema: The name of the schema to be created or None for main.
        if_not_exists: If True, the schema will only be created if it does
                       not already exist, preventing an error. Defaults to False.
        init_models: If True, all models registered with the registry will have
                     their tables created within the new schema. Defaults to False.
        init_tenant_models: If True, tenant-specific models will have their
                           tables created within the new schema. This operation
                           temporarily bypasses global field constraints. Defaults to False.
        update_cache: If True, the model's schema cache will be updated.
                      Defaults to True.
        databases: A sequence of database names (keys from `registry.extra`)
                   or None for the default database, on which the schema
                   should be created. Defaults to `(None,)`, meaning only
                   the default database.
    Raises:
        SchemaError: If there is an issue during schema creation or table
                     initialization within the schema.
    """
    tenant_tables: list[sqlalchemy.Table] = []
    # If init_models is True, iterate through all registered models and
    # update their table schema and cache.
    if init_models:
        for model_class in self.registry.models.values():
            # only init if no schema is set. This prevents problems with e.g. reflection.
            if model_class.__using_schema__ is Undefined:
                model_class.table_schema(schema=schema, update_cache=update_cache)

    # If init_tenant_models is True, handle the creation of tenant-specific model tables.
    if init_tenant_models:
        # Temporarily disable global field constraints for tenant model building.
        token = NO_GLOBAL_FIELD_CONSTRAINTS.set(True)
        try:
            # Iterate through tenant models and build their tables with the specified schema.
            for model_class in self.registry.tenant_models.values():
                tenant_tables.append(model_class.build(schema=schema))
        finally:
            # Ensure global field constraints are re-enabled after processing tenant models.
            NO_GLOBAL_FIELD_CONSTRAINTS.reset(token)

        # Perform a second pass to add global field constraints to tenant models.
        for model_class in self.registry.tenant_models.values():
            model_class.add_global_field_constraints(schema=schema)

    def execute_create(connection: sqlalchemy.Connection, name: str | None) -> None:
        """
        Internal helper function to execute the schema and table creation
        within a given database connection.
        """
        if schema is not None:
            try:
                # Attempt to create the schema.
                connection.execute(
                    sqlalchemy.schema.CreateSchema(name=schema, if_not_exists=if_not_exists)
                )
            except ProgrammingError as e:
                # Raise a SchemaError if there's a programming error during schema creation.
                raise SchemaError(detail=e.orig.args[0]) from e

        # If tenant_tables exist, create them within the schema.
        if tenant_tables:
            self.registry.metadata_by_name[name].create_all(
                connection, checkfirst=if_not_exists, tables=tenant_tables
            )
        # If init_models is True, create all registered model tables within the schema.
        if init_models:
            self.registry.metadata_by_name[name].create_all(
                connection, checkfirst=if_not_exists
            )

    # Iterate through the specified databases to perform schema creation.
    for database_name in databases:
        # Determine which database instance to use based on database_name.
        db = (
            self.registry.database
            if database_name is None
            else self.registry.extra[database_name]
        )
        # Enter an asynchronous context for the database connection, disabling rollback.
        # prevents warning of inperformance
        async with db as db:
            with db.force_rollback(False):
                # run the operation sequencially because we need the right context and
                # a connected database
                await db.run_sync(execute_create, database_name)

drop_schema async

drop_schema(schema, cascade=False, if_exists=False, databases=(None,))

Drops an existing database schema, optionally cascading the drop to all contained objects.

PARAMETER DESCRIPTION
schema

The name of the schema to be dropped. If None fallback to the classic drop_all logic.

TYPE: str | None

cascade

If True, all objects (tables, views, etc.) within the schema will also be dropped. Defaults to False.

TYPE: bool DEFAULT: False

if_exists

If True, the schema will only be dropped if it exists, preventing an error if it does not. Defaults to False.

TYPE: bool DEFAULT: False

databases

A sequence of database names (keys from registry.extra) or None for the default database, from which the schema should be dropped. Defaults to (None,), meaning only the default database.

TYPE: Sequence[str | None] DEFAULT: (None,)

Raises: SchemaError: If there is an issue during schema drop operation.

Source code in edgy/core/connection/schemas.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
async def drop_schema(
    self,
    schema: str | None,
    cascade: bool = False,
    if_exists: bool = False,
    databases: Sequence[str | None] = (None,),
) -> None:
    """
    Drops an existing database schema, optionally cascading the drop
    to all contained objects.

    Args:
        schema: The name of the schema to be dropped. If None fallback to the classic drop_all logic.
        cascade: If True, all objects (tables, views, etc.) within the
                 schema will also be dropped. Defaults to False.
        if_exists: If True, the schema will only be dropped if it exists,
                   preventing an error if it does not. Defaults to False.
        databases: A sequence of database names (keys from `registry.extra`)
                   or None for the default database, from which the schema
                   should be dropped. Defaults to `(None,)`, meaning only
                   the default database.
    Raises:
        SchemaError: If there is an issue during schema drop operation.
    """

    def execute_drop(connection: sqlalchemy.Connection) -> None:
        """
        Internal helper function to execute the schema drop
        within a given database connection.
        """
        try:
            # just remove the schema
            if schema is not None:
                # Attempt to drop the schema.
                connection.execute(
                    sqlalchemy.schema.DropSchema(
                        name=schema, cascade=cascade, if_exists=if_exists
                    )
                )
            else:
                # if no schema is found remove all registered models in meta
                self.registry.metadata_by_name[database_name].drop_all(
                    connection, checkfirst=if_exists
                )
        except DBAPIError as e:
            # Raise a SchemaError if there's a database API error during schema drop.
            raise SchemaError(detail=e.orig.args[0]) from e

    # Iterate through the specified databases to perform schema drop.
    for database_name in databases:
        # Determine which database instance to use based on database_name.
        db = (
            self.registry.database
            if database_name is None
            else self.registry.extra[database_name]
        )
        # Enter an asynchronous context for the database connection, disabling rollback.
        # prevents warning of inperformance
        async with db as db:
            with db.force_rollback(False):
                # run the operation sequencially because we need the right context and
                # a connected database
                await db.run_sync(execute_drop)

get_metadata_of_all_schemes async

get_metadata_of_all_schemes(database, *, no_reflect=False)

Retrieves metadata and a list of all schema names for a given database.

This method reflects the table structures for registered models within each discovered schema if no_reflect is False.

PARAMETER DESCRIPTION
database

The database instance from which to retrieve schema metadata.

TYPE: Database

no_reflect

If True, tables will not be reflected into the metadata. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
tuple[MetaData, list[str]]

A tuple containing: - sqlalchemy.MetaData: An SQLAlchemy MetaData object populated with reflected tables (if no_reflect is False). - list[str]: A list of schema names found in the database.

Source code in edgy/core/connection/schemas.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
async def get_metadata_of_all_schemes(
    self, database: Database, *, no_reflect: bool = False
) -> tuple[sqlalchemy.MetaData, list[str]]:
    """
    Retrieves metadata and a list of all schema names for a given database.

    This method reflects the table structures for registered models
    within each discovered schema if `no_reflect` is False.

    Args:
        database: The database instance from which to retrieve schema metadata.
        no_reflect: If True, tables will not be reflected into the metadata.
                    Defaults to False.

    Returns:
        A tuple containing:
            - sqlalchemy.MetaData: An SQLAlchemy MetaData object populated
                                   with reflected tables (if `no_reflect` is False).
            - list[str]: A list of schema names found in the database.
    """
    # Get a set of all table names registered in the registry.
    tablenames = self.registry.get_tablenames()

    async with database as database:
        # Initialize an empty list to store schema names.
        list_schemes: list[str] = []
        # Create a new SQLAlchemy MetaData object.
        metadata = sqlalchemy.MetaData()
        # Force no rollback for the database connection.
        with database.force_rollback(False):

            def wrapper(connection: sqlalchemy.Connection) -> None:
                """
                Internal wrapper function to perform synchronous database
                inspection and reflection.
                """
                nonlocal list_schemes
                # Create an inspector from the database connection.
                inspector = sqlalchemy.inspect(connection)
                # Get the default schema name from the inspector.
                default_schema_name = inspector.default_schema_name
                # Get all schema names and replace the default schema name with an empty string.
                list_schemes = [
                    "" if default_schema_name == schema else schema
                    for schema in inspector.get_schema_names()
                ]
                # If no_reflect is False, reflect table metadata for each schema.
                if not no_reflect:
                    for schema in list_schemes:
                        metadata.reflect(
                            connection, schema=schema, only=lambda name, _: name in tablenames
                        )

            # Run the synchronous wrapper function on the database.
            await database.run_sync(wrapper)
            # Return the populated metadata and the list of schema names.
            return metadata, list_schemes

get_schemes_tree async

get_schemes_tree(*, no_reflect=False)

Builds a comprehensive tree-like structure of schemas across all registered databases.

Each entry in the resulting dictionary represents a database (identified by its name or None for the default), containing its URL, its SQLAlchemy MetaData, and a list of schema names.

PARAMETER DESCRIPTION
no_reflect

If True, tables will not be reflected into the metadata for any schema. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str | None, tuple[str, MetaData, list[str]]]

A dictionary where keys are database names (or None for the default

dict[str | None, tuple[str, MetaData, list[str]]]

database) and values are tuples containing: - str: The URL of the database. - sqlalchemy.MetaData: The MetaData object for the database. - list[str]: A list of schema names found in that database.

Source code in edgy/core/connection/schemas.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
async def get_schemes_tree(
    self, *, no_reflect: bool = False
) -> dict[str | None, tuple[str, sqlalchemy.MetaData, list[str]]]:
    """
    Builds a comprehensive tree-like structure of schemas across all
    registered databases.

    Each entry in the resulting dictionary represents a database (identified
    by its name or None for the default), containing its URL, its SQLAlchemy
    MetaData, and a list of schema names.

    Args:
        no_reflect: If True, tables will not be reflected into the metadata
                    for any schema. Defaults to False.

    Returns:
        A dictionary where keys are database names (or None for the default
        database) and values are tuples containing:
            - str: The URL of the database.
            - sqlalchemy.MetaData: The MetaData object for the database.
            - list[str]: A list of schema names found in that database.
    """
    # Initialize the schemes_tree dictionary.
    schemes_tree: dict[str | None, tuple[str, sqlalchemy.MetaData, list[str]]] = {
        None: (
            str(self.database.url),
            # Get metadata and schemes for the default database.
            *(
                await self.get_metadata_of_all_schemes(
                    self.registry.database, no_reflect=no_reflect
                )
            ),
        )
    }
    # Iterate through extra databases registered in the registry.
    for key, val in self.registry.extra.items():
        # Populate schemes_tree for each extra database.
        schemes_tree[key] = (
            str(val.url),
            # Get metadata and schemes for the current extra database.
            *(await self.get_metadata_of_all_schemes(val, no_reflect=no_reflect)),
        )
    # Return the complete schemes_tree.
    return schemes_tree