Skip to content

Database class

edgy.Database

Database(url=None, *, force_rollback=None, config=None, **options)

An abstraction on the top of the EncodeORM databases.Database object.

This object allows to pass also a configuration dictionary in the format of

DATABASEZ_CONFIG = { "connection": { "credentials": { "scheme": 'sqlite', "postgres"... "host": ..., "port": ..., "user": ..., "password": ..., "database": ..., "options": { "driver": ... "ssl": ... } } } }

PARAMETER DESCRIPTION
url

TYPE: Optional[Union[str, DatabaseURL, URL, Database]] DEFAULT: None

force_rollback

TYPE: Union[bool, None] DEFAULT: None

config

TYPE: Optional['DictAny'] DEFAULT: None

**options

TYPE: Any DEFAULT: {}

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def __init__(
    self,
    url: typing.Optional[typing.Union[str, DatabaseURL, URL, Database]] = None,
    *,
    force_rollback: typing.Union[bool, None] = None,
    config: typing.Optional["DictAny"] = None,
    **options: typing.Any,
):
    init()
    assert config is None or url is None, "Use either 'url' or 'config', not both."
    if isinstance(url, Database):
        assert not options, "Cannot specify options when copying a Database object."
        self.backend = url.backend.__copy__()
        self.url = url.url
        self.options = url.options
        self._call_hooks = url._call_hooks
        if force_rollback is None:
            force_rollback = bool(url.force_rollback)
    else:
        url = DatabaseURL(url)
        if config and "connection" in config:
            connection_config = config["connection"]
            if "credentials" in connection_config:
                connection_config = connection_config["credentials"]
                url = url.replace(**connection_config)
        self.backend, self.url, self.options = self.apply_database_url_and_options(
            url, **options
        )
        if force_rollback is None:
            force_rollback = False
    self._force_rollback = ForceRollback(force_rollback)
    self.backend.owner = self
    self._connection_map = weakref.WeakKeyDictionary()
    self._databases_map = {}

    # When `force_rollback=True` is used, we use a single global
    # connection, within a transaction that always rolls back.
    self._global_connection: typing.Optional[Connection] = None

    self.ref_counter: int = 0
    self.ref_lock: asyncio.Lock = asyncio.Lock()

_connection_map instance-attribute

_connection_map = WeakKeyDictionary()

_databases_map instance-attribute

_databases_map = {}

_loop class-attribute instance-attribute

_loop = None

backend instance-attribute

backend

url instance-attribute

url

options instance-attribute

options

is_connected class-attribute instance-attribute

is_connected = False

_call_hooks class-attribute instance-attribute

_call_hooks = True

_force_rollback instance-attribute

_force_rollback = ForceRollback(force_rollback)

force_rollback class-attribute instance-attribute

force_rollback = ForceRollbackDescriptor()

_global_connection instance-attribute

_global_connection = None

ref_counter instance-attribute

ref_counter = 0

ref_lock instance-attribute

ref_lock = Lock()

_current_task property

_current_task

_connection property writable

_connection

engine property

engine

__copy__

__copy__()
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
199
200
def __copy__(self) -> Database:
    return self.__class__(self)

inc_refcount async

inc_refcount()

Internal method to bump the ref_count.

Return True if ref_count is 0, False otherwise.

Should not be used outside of tests. Use connect and hooks instead. Not multithreading safe!

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
async def inc_refcount(self) -> bool:
    """
    Internal method to bump the ref_count.

    Return True if ref_count is 0, False otherwise.

    Should not be used outside of tests. Use connect and hooks instead.
    Not multithreading safe!
    """
    async with self.ref_lock:
        self.ref_counter += 1
        # on the first call is count is 1 because of the former +1
        if self.ref_counter == 1:
            return True
    return False

decr_refcount async

decr_refcount()

Internal method to decrease the ref_count.

Return True if ref_count drops to 0, False otherwise.

Should not be used outside of tests. Use disconnect and hooks instead. Not multithreading safe!

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
async def decr_refcount(self) -> bool:
    """
    Internal method to decrease the ref_count.

    Return True if ref_count drops to 0, False otherwise.

    Should not be used outside of tests. Use disconnect and hooks instead.
    Not multithreading safe!
    """
    async with self.ref_lock:
        self.ref_counter -= 1
        # on the last call, the count is 0
        if self.ref_counter == 0:
            return True
    return False

connect_hook async

connect_hook()

Refcount protected connect hook. Executed begore engine and global connection setup.

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
256
257
async def connect_hook(self) -> None:
    """Refcount protected connect hook. Executed begore engine and global connection setup."""

connect async

connect()

Establish the connection pool.

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
async def connect(self) -> bool:
    """
    Establish the connection pool.
    """
    loop = asyncio.get_running_loop()
    if self._loop is not None and loop != self._loop:
        # copy when not in map
        if loop not in self._databases_map:
            # prevent side effects of connect_hook
            database = self.__copy__()
            database._call_hooks = False
            assert self._global_connection
            database._global_connection = await self._global_connection.__aenter__()
            self._databases_map[loop] = database
        # forward call
        return await self._databases_map[loop].connect()

    if not await self.inc_refcount():
        assert self.is_connected, "ref_count < 0"
        return False
    if self._call_hooks:
        try:
            await self.connect_hook()
        except BaseException as exc:
            await self.decr_refcount()
            raise exc
    self._loop = asyncio.get_event_loop()

    await self.backend.connect(self.url, **self.options)
    logger.info("Connected to database %s", self.url.obscure_password, extra=CONNECT_EXTRA)
    self.is_connected = True

    if self._global_connection is None:
        self._global_connection = Connection(self, self.backend, force_rollback=True)
    return True

disconnect_hook async

disconnect_hook()

Refcount protected disconnect hook. Executed after connection, engine cleanup.

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
295
296
async def disconnect_hook(self) -> None:
    """Refcount protected disconnect hook. Executed after connection, engine cleanup."""

disconnect async

disconnect(force=False, *, parent_database=None)

Close all connections in the connection pool.

PARAMETER DESCRIPTION
force

TYPE: bool DEFAULT: False

parent_database

TYPE: Optional[Database] DEFAULT: None

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
@multiloop_protector(True, inject_parent=True)
async def disconnect(
    self, force: bool = False, *, parent_database: typing.Optional[Database] = None
) -> bool:
    """
    Close all connections in the connection pool.
    """
    # parent_database is injected and should not be specified manually
    if not await self.decr_refcount() or force:
        if not self.is_connected:
            logger.debug("Already disconnected, skipping disconnection")
            return False
        if force:
            logger.warning("Force disconnect, despite refcount not 0")
        else:
            return False
    if parent_database is not None:
        loop = asyncio.get_running_loop()
        del parent_database._databases_map[loop]
    if force:
        for sub_database in self._databases_map.values():
            await sub_database.disconnect(True)
        self._databases_map = {}
    assert not self._databases_map, "sub databases still active"

    try:
        assert self._global_connection is not None
        await self._global_connection.__aexit__()
        self._global_connection = None
        self._connection = None
    finally:
        logger.info(
            "Disconnected from database %s",
            self.url.obscure_password,
            extra=DISCONNECT_EXTRA,
        )
        self.is_connected = False
        await self.backend.disconnect()
        self._loop = None
        if self._call_hooks:
            await self.disconnect_hook()
    return True

fetch_all async

fetch_all(query, values=None)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Optional[dict] DEFAULT: None

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
356
357
358
359
360
361
362
363
@multiloop_protector(False)
async def fetch_all(
    self,
    query: typing.Union[ClauseElement, str],
    values: typing.Optional[dict] = None,
) -> typing.List[interfaces.Record]:
    async with self.connection() as connection:
        return await connection.fetch_all(query, values)

fetch_one async

fetch_one(query, values=None, pos=0)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Optional[dict] DEFAULT: None

pos

TYPE: int DEFAULT: 0

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
365
366
367
368
369
370
371
372
373
374
@multiloop_protector(False)
async def fetch_one(
    self,
    query: typing.Union[ClauseElement, str],
    values: typing.Optional[dict] = None,
    pos: int = 0,
) -> typing.Optional[interfaces.Record]:
    async with self.connection() as connection:
        return await connection.fetch_one(query, values, pos=pos)
        assert connection._connection_counter == 1

fetch_val async

fetch_val(query, values=None, column=0, pos=0)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Optional[dict] DEFAULT: None

column

TYPE: Any DEFAULT: 0

pos

TYPE: int DEFAULT: 0

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
376
377
378
379
380
381
382
383
384
385
@multiloop_protector(False)
async def fetch_val(
    self,
    query: typing.Union[ClauseElement, str],
    values: typing.Optional[dict] = None,
    column: typing.Any = 0,
    pos: int = 0,
) -> typing.Any:
    async with self.connection() as connection:
        return await connection.fetch_val(query, values, column=column, pos=pos)

execute async

execute(query, values=None)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Any DEFAULT: None

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
387
388
389
390
391
392
393
394
@multiloop_protector(False)
async def execute(
    self,
    query: typing.Union[ClauseElement, str],
    values: typing.Any = None,
) -> typing.Union[interfaces.Record, int]:
    async with self.connection() as connection:
        return await connection.execute(query, values)

execute_many async

execute_many(query, values=None)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Any DEFAULT: None

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
396
397
398
399
400
401
@multiloop_protector(False)
async def execute_many(
    self, query: typing.Union[ClauseElement, str], values: typing.Any = None
) -> typing.Union[typing.Sequence[interfaces.Record], int]:
    async with self.connection() as connection:
        return await connection.execute_many(query, values)

iterate async

iterate(query, values=None, chunk_size=None)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Optional[dict] DEFAULT: None

chunk_size

TYPE: Optional[int] DEFAULT: None

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
403
404
405
406
407
408
409
410
411
412
@multiloop_protector(False)
async def iterate(
    self,
    query: typing.Union[ClauseElement, str],
    values: typing.Optional[dict] = None,
    chunk_size: typing.Optional[int] = None,
) -> typing.AsyncGenerator[interfaces.Record, None]:
    async with self.connection() as connection:
        async for record in connection.iterate(query, values, chunk_size):
            yield record

batched_iterate async

batched_iterate(query, values=None, batch_size=None, batch_wrapper=tuple)
PARAMETER DESCRIPTION
query

TYPE: Union[ClauseElement, str]

values

TYPE: Optional[dict] DEFAULT: None

batch_size

TYPE: Optional[int] DEFAULT: None

batch_wrapper

TYPE: Union[BatchCallable] DEFAULT: tuple

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
414
415
416
417
418
419
420
421
422
423
424
@multiloop_protector(False)
async def batched_iterate(
    self,
    query: typing.Union[ClauseElement, str],
    values: typing.Optional[dict] = None,
    batch_size: typing.Optional[int] = None,
    batch_wrapper: typing.Union[BatchCallable] = tuple,
) -> typing.AsyncGenerator[BatchCallableResult, None]:
    async with self.connection() as connection:
        async for records in connection.batched_iterate(query, values, batch_size):
            yield batch_wrapper(records)

transaction

transaction(*, force_rollback=False, **kwargs)
PARAMETER DESCRIPTION
force_rollback

TYPE: bool DEFAULT: False

**kwargs

TYPE: Any DEFAULT: {}

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
426
427
428
@multiloop_protector(True)
def transaction(self, *, force_rollback: bool = False, **kwargs: typing.Any) -> "Transaction":
    return Transaction(self.connection, force_rollback=force_rollback, **kwargs)

run_sync async

run_sync(fn, *args, **kwargs)
PARAMETER DESCRIPTION
fn

TYPE: Callable[..., Any]

*args

TYPE: Any DEFAULT: ()

**kwargs

TYPE: Any DEFAULT: {}

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
430
431
432
433
434
435
436
437
438
@multiloop_protector(False)
async def run_sync(
    self,
    fn: typing.Callable[..., typing.Any],
    *args: typing.Any,
    **kwargs: typing.Any,
) -> typing.Any:
    async with self.connection() as connection:
        return await connection.run_sync(fn, *args, **kwargs)

create_all async

create_all(meta, **kwargs)
PARAMETER DESCRIPTION
meta

TYPE: MetaData

**kwargs

TYPE: Any DEFAULT: {}

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
440
441
442
443
@multiloop_protector(False)
async def create_all(self, meta: MetaData, **kwargs: typing.Any) -> None:
    async with self.connection() as connection:
        await connection.create_all(meta, **kwargs)

drop_all async

drop_all(meta, **kwargs)
PARAMETER DESCRIPTION
meta

TYPE: MetaData

**kwargs

TYPE: Any DEFAULT: {}

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
445
446
447
448
@multiloop_protector(False)
async def drop_all(self, meta: MetaData, **kwargs: typing.Any) -> None:
    async with self.connection() as connection:
        await connection.drop_all(meta, **kwargs)

connection

connection()
Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
450
451
452
453
454
455
456
457
458
@multiloop_protector(False)
def connection(self) -> Connection:
    if self.force_rollback:
        return typing.cast(Connection, self._global_connection)

    if self._connection is None:
        _connection = self._connection = Connection(self, self.backend)
        return _connection
    return self._connection

get_backends classmethod

get_backends(scheme='', *, overwrite_paths=['databasez.overwrites'], database_name='Database', connection_name='Connection', transaction_name='Transaction', database_class=None, connection_class=None, transaction_class=None)
PARAMETER DESCRIPTION
scheme

TYPE: str DEFAULT: ''

overwrite_paths

TYPE: Sequence[str] DEFAULT: ['databasez.overwrites']

database_name

TYPE: str DEFAULT: 'Database'

connection_name

TYPE: str DEFAULT: 'Connection'

transaction_name

TYPE: str DEFAULT: 'Transaction'

database_class

TYPE: Optional[Type[DatabaseBackend]] DEFAULT: None

connection_class

TYPE: Optional[Type[ConnectionBackend]] DEFAULT: None

transaction_class

TYPE: Optional[Type[TransactionBackend]] DEFAULT: None

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
@classmethod
def get_backends(
    cls,
    # let scheme empty for direct imports
    scheme: str = "",
    *,
    overwrite_paths: typing.Sequence[str] = ["databasez.overwrites"],
    database_name: str = "Database",
    connection_name: str = "Connection",
    transaction_name: str = "Transaction",
    database_class: typing.Optional[typing.Type[interfaces.DatabaseBackend]] = None,
    connection_class: typing.Optional[typing.Type[interfaces.ConnectionBackend]] = None,
    transaction_class: typing.Optional[typing.Type[interfaces.TransactionBackend]] = None,
) -> typing.Tuple[
    typing.Type[interfaces.DatabaseBackend],
    typing.Type[interfaces.ConnectionBackend],
    typing.Type[interfaces.TransactionBackend],
]:
    module: typing.Any = None
    for overwrite_path in overwrite_paths:
        imp_path = f"{overwrite_path}.{scheme.replace('+', '_')}" if scheme else overwrite_path
        try:
            module = importlib.import_module(imp_path)
        except ImportError as exc:
            logging.debug(
                f'Import of "{imp_path}" failed. This is not an error.', exc_info=exc
            )
            if "+" in scheme:
                imp_path = f"{overwrite_path}.{scheme.split('+', 1)[0]}"
                try:
                    module = importlib.import_module(imp_path)
                except ImportError as exc:
                    logging.debug(
                        f'Import of "{imp_path}" failed. This is not an error.', exc_info=exc
                    )
        if module is not None:
            break
    database_class = getattr(module, database_name, database_class)
    assert database_class is not None and issubclass(
        database_class, interfaces.DatabaseBackend
    )
    connection_class = getattr(module, connection_name, connection_class)
    assert connection_class is not None and issubclass(
        connection_class, interfaces.ConnectionBackend
    )
    transaction_class = getattr(module, transaction_name, transaction_class)
    assert transaction_class is not None and issubclass(
        transaction_class, interfaces.TransactionBackend
    )
    return database_class, connection_class, transaction_class

apply_database_url_and_options classmethod

apply_database_url_and_options(url, *, overwrite_paths=['databasez.overwrites'], **options)
PARAMETER DESCRIPTION
url

TYPE: Union[DatabaseURL, str]

overwrite_paths

TYPE: Sequence[str] DEFAULT: ['databasez.overwrites']

**options

TYPE: Any DEFAULT: {}

Source code in .venv/lib/python3.11/site-packages/databasez/core/database.py
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
@classmethod
def apply_database_url_and_options(
    cls,
    url: typing.Union[DatabaseURL, str],
    *,
    overwrite_paths: typing.Sequence[str] = ["databasez.overwrites"],
    **options: typing.Any,
) -> typing.Tuple[interfaces.DatabaseBackend, DatabaseURL, typing.Dict[str, typing.Any]]:
    url = DatabaseURL(url)
    database_class, connection_class, transaction_class = cls.get_backends(
        url.scheme,
        database_class=default_database,
        connection_class=default_connection,
        transaction_class=default_transaction,
        overwrite_paths=overwrite_paths,
    )

    backend = database_class(
        connection_class=connection_class, transaction_class=transaction_class
    )
    url, options = backend.extract_options(url, **options)
    # check against transformed url
    assert url.sqla_url.get_dialect(True).is_async, f'Dialect: "{url.scheme}" is not async.'

    return backend, url, options