Skip to content

Schema class

edgy.core.connection.schemas.Schema

Schema(registry)

All the schema operations object.

All the operations regarding a schema are placed in one object

PARAMETER DESCRIPTION
registry

TYPE: Type[Registry]

Source code in edgy/core/connection/schemas.py
20
21
def __init__(self, registry: Type["Registry"]) -> None:
    self.registry = registry

registry instance-attribute

registry = registry

get_default_schema

get_default_schema()

Returns the default schema which is usually None

Source code in edgy/core/connection/schemas.py
23
24
25
26
27
def get_default_schema(self) -> str:
    """
    Returns the default schema which is usually None
    """
    return cast("str", self.registry.engine.dialect.default_schema_name)

activate_schema_path async

activate_schema_path(database, schema, is_shared=True)
PARAMETER DESCRIPTION
database

TYPE: Database

schema

TYPE: str

is_shared

TYPE: bool DEFAULT: True

Source code in edgy/core/connection/schemas.py
29
30
31
32
33
34
35
36
37
38
async def activate_schema_path(
    self, database: Database, schema: str, is_shared: bool = True
) -> None:
    path = (
        f"SET search_path TO {schema}, shared;"
        if is_shared
        else f"SET search_path TO {schema};"
    )
    expression = sqlalchemy.text(path)
    await database.execute(expression)

create_schema async

create_schema(schema, if_not_exists=False)

Creates a model schema if it does not exist.

PARAMETER DESCRIPTION
schema

TYPE: str

if_not_exists

TYPE: bool DEFAULT: False

Source code in edgy/core/connection/schemas.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
async def create_schema(self, schema: str, if_not_exists: bool = False) -> None:
    """
    Creates a model schema if it does not exist.
    """

    def execute_create(connection: sqlalchemy.Connection) -> None:
        try:
            connection.execute(
                sqlalchemy.schema.CreateSchema(name=schema, if_not_exists=if_not_exists)  # type: ignore
            )
        except ProgrammingError as e:
            raise SchemaError(detail=e.orig.args[0]) from e  # type: ignore

    # database can be also a TestClient, so use the Database and copy to not have strange error
    async with Database(
        self.registry.database, force_rollback=False
    ) as database, database.transaction():
        await database.run_sync(execute_create)

drop_schema async

drop_schema(schema, cascade=False, if_exists=False)

Drops an existing model schema.

PARAMETER DESCRIPTION
schema

TYPE: str

cascade

TYPE: bool DEFAULT: False

if_exists

TYPE: bool DEFAULT: False

Source code in edgy/core/connection/schemas.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def drop_schema(
    self, schema: str, cascade: bool = False, if_exists: bool = False
) -> None:
    """
    Drops an existing model schema.
    """

    def execute_drop(connection: sqlalchemy.Connection) -> None:
        try:
            connection.execute(
                sqlalchemy.schema.DropSchema(name=schema, cascade=cascade, if_exists=if_exists)  # type: ignore
            )
        except DBAPIError as e:
            raise SchemaError(detail=e.orig.args[0]) from e  # type: ignore

    # database can be also a TestClient, so use the Database and copy to not have strange error
    async with Database(
        self.registry.database, force_rollback=False
    ) as database, database.transaction():
        await database.run_sync(execute_drop)