Skip to content

Schema class

edgy.core.connection.schemas.Schema

Schema(registry)

All the schema operations object.

All the operations regarding a schema are placed in one object

PARAMETER DESCRIPTION
registry

TYPE: Registry

Source code in edgy/core/connection/schemas.py
25
26
def __init__(self, registry: "Registry") -> None:
    self.registry = registry

_default_schema instance-attribute

_default_schema

registry instance-attribute

registry = registry

database property

database

get_default_schema

get_default_schema()

Returns the default schema which is usually None

Source code in edgy/core/connection/schemas.py
32
33
34
35
36
37
38
def get_default_schema(self) -> Optional[str]:
    """
    Returns the default schema which is usually None
    """
    if not hasattr(self, "_default_schema"):
        self._default_schema = self.database.url.sqla_url.get_dialect(True).default_schema_name
    return self._default_schema

activate_schema_path async

activate_schema_path(database, schema, is_shared=True)
PARAMETER DESCRIPTION
database

TYPE: Database

schema

TYPE: str

is_shared

TYPE: bool DEFAULT: True

Source code in edgy/core/connection/schemas.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def activate_schema_path(
    self, database: Database, schema: str, is_shared: bool = True
) -> None:
    # INSECURE, but not used by default. Add warning
    # TODO: remove when there are no users of the method
    warnings.warn(
        "`activate_schema_path` is dangerous because the schema is not properly escaped and deprecated.",
        DeprecationWarning,
        stacklevel=2,
    )
    path = (
        f"SET search_path TO {schema}, shared;"
        if is_shared
        else f"SET search_path TO {schema};"
    )
    expression = sqlalchemy.text(path)
    await database.execute(expression)

create_schema async

create_schema(schema, if_not_exists=False, init_models=False, init_tenant_models=False, update_cache=True, databases=(None))

Creates a model schema if it does not exist.

PARAMETER DESCRIPTION
schema

TYPE: str

if_not_exists

TYPE: bool DEFAULT: False

init_models

TYPE: bool DEFAULT: False

init_tenant_models

TYPE: bool DEFAULT: False

update_cache

TYPE: bool DEFAULT: True

databases

TYPE: Sequence[Union[str, None]] DEFAULT: (None)

Source code in edgy/core/connection/schemas.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
async def create_schema(
    self,
    schema: str,
    if_not_exists: bool = False,
    init_models: bool = False,
    init_tenant_models: bool = False,
    update_cache: bool = True,
    databases: Sequence[Union[str, None]] = (None,),
) -> None:
    """
    Creates a model schema if it does not exist.
    """
    tables: list[sqlalchemy.Table] = []
    if init_models:
        for model_class in self.registry.models.values():
            model_class.table_schema(schema=schema, update_cache=update_cache)
    if init_tenant_models and init_models:
        for model_class in self.registry.tenant_models.values():
            model_class.table_schema(schema=schema, update_cache=update_cache)
    elif init_tenant_models:
        for model_class in self.registry.tenant_models.values():
            tables.append(model_class.table_schema(schema=schema, update_cache=update_cache))

    def execute_create(connection: sqlalchemy.Connection) -> None:
        try:
            connection.execute(
                sqlalchemy.schema.CreateSchema(name=schema, if_not_exists=if_not_exists)
            )
        except ProgrammingError as e:
            raise SchemaError(detail=e.orig.args[0]) from e
        for table in tables:
            table.create(connection, checkfirst=if_not_exists)
        if init_models:
            self.registry.metadata.create_all(connection, checkfirst=if_not_exists)

    ops = []
    for database in databases:
        db = self.registry.database if database is None else self.registry.extra[database]
        # don't warn here about inperformance
        async with db as db:
            with db.force_rollback(False):
                ops.append(db.run_sync(execute_create))
    await asyncio.gather(*ops)

drop_schema async

drop_schema(schema, cascade=False, if_exists=False, databases=(None))

Drops an existing model schema.

PARAMETER DESCRIPTION
schema

TYPE: str

cascade

TYPE: bool DEFAULT: False

if_exists

TYPE: bool DEFAULT: False

databases

TYPE: Sequence[Union[str, None]] DEFAULT: (None)

Source code in edgy/core/connection/schemas.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def drop_schema(
    self,
    schema: str,
    cascade: bool = False,
    if_exists: bool = False,
    databases: Sequence[Union[str, None]] = (None,),
) -> None:
    """
    Drops an existing model schema.
    """

    def execute_drop(connection: sqlalchemy.Connection) -> None:
        try:
            connection.execute(
                sqlalchemy.schema.DropSchema(name=schema, cascade=cascade, if_exists=if_exists)
            )
        except DBAPIError as e:
            raise SchemaError(detail=e.orig.args[0]) from e

    ops = []

    for database in databases:
        db = self.registry.database if database is None else self.registry.extra[database]
        # don't warn here about inperformance
        async with db as db:
            with db.force_rollback(False):
                ops.append(db.run_sync(execute_drop))
    await asyncio.gather(*ops)