High-level API

Added in version 0.7.1.

The high-level API is a wrapper over the mid-level connection API, combined with a connection pool.

Pooling

The high-level API provides a connection pool that automatically maintains a certain amount of idle connections to the PostgreSQL server. As only one query can be issued to a server at any one time, this provides an effective way of performinng multiple queries concurrently.

Warning

These connections are PERSISTENT connections. The pool does not reap connections for idle connections. If you use something ala pgbouncer, that automatically closes idle connections, and your application is relatively low activity, there will be a cascading failure as broken connections are checked out of the pool and aren’t reconnected until an obvious error happens upon trying to query on a disconnected connection.

Connecting

To create a connection pool, use open_pool().

pool.open_pool(username, *, connection_count=None, port=5432, password=None, database=None, ssl_context=None)

Opens a new connection pool to a PostgreSQL server. This is an asynchronous context manager.

This takes the same arguments and keyworrd arguments as open_database_connection(), except for the optional connection_count parameter.

Parameters:

connection_count (int | None) – The ideal number of connections to keep open at any one time. The pool may shrink slightly as connections are closed due to network errors and aren’t immediately re-opened.

Return type:

AsyncGenerator[PooledDatabaseInterface]

By default, the connection count is (CPU_COUNT * 2) + 1.

Warning

Your connection count should be relatively low. The default is a very good idea for nearly all applications. Don’t change it unless you have the benchmarks to prove it’s a good idea. See: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing

class pg_purepy.pool.PooledDatabaseInterface(count, nursery, conn_fn)

Bases: object

Connection pool based PostgreSQL interface.

This is the API you should prefer for any real application as the single connection class cannot be used for concurrent access to the database.

property max_connections: int

The maximum number of connections this pool may have idle.

property idle_connections: int

The number of the connections that haven’t currently been checked out.

property waiting_tasks: int

The number of tasks that are currently waiting for a connection to be used.

Querying

The connection pool object has a similar high-level query API to the mid-level API.

async PooledDatabaseInterface.execute(query, *params, **kwargs)

Executes a query on the next available connection. See AsyncPostgresConnection.execute() for more information.

Return type:

int

async PooledDatabaseInterface.fetch(query, *params, **kwargs)

Fetches the result of a query on the next available connection. See AsyncPostgresConnection.fetch() for more information.

Return type:

list[DataRow]

async PooledDatabaseInterface.fetch_one(query, *params, **kwargs)

Like fetch(), but only returns one row. See AsyncPostgresConnection.fetch_one() for more information.

Return type:

DataRow

async PooledDatabaseInterface.fetch_one_or_none(query, *params, **kwargs)

See AsyncPostgresConnection.fetch_one_or_none().

Return type:

DataRow | None

If you wish to use a connection from the pool directly, use PooledDatabaseInterface.checkout_connection().

PooledDatabaseInterface.checkout_connection(start_new_transaction=False)

Checks out a single connection from the connection pool.

This is an asynchronous context manager that will automatically clean up the connection once the context manager block exits.

If start_new_transaction is True, this will issue a new transaction before yielding the connection.

Return type:

AsyncIterator[AsyncPostgresConnection]

Transactions

As two subsequent queries may not be on the same connection, transactions get tricky. For that end, the pool has a special PooledDatabaseInterface.checkout_in_transaction() method which checks out a connection for exclusive usage in a transaction block.

PooledDatabaseInterface.checkout_in_transaction()

Shortcut for checkout_connection(start_new_transaction=True).

Return type:

AsyncGenerator[AsyncPostgresConnection]

async with pool.checkout_in_transaction() as conn:
    await conn.fetch("insert into ...")

The transaction will be automatically committed or rolled back as appropriate at the end of the async with block, and the connection will not be reused until the checkout is done.

Converters

You can add converters like the other two APIs using PooledDatabaseInterface.add_converter(). This will add it to all open connections, as well as any future connections that may be opened.

PooledDatabaseInterface.add_converter(converter)

Registers a converter for all the connections on this pool.

Return type:

None

If you wish to automatically add the array converter for converting PostgreSQL arrays of a custom type that is converted, use PooledDatabaseInterface.add_converter_with_array().

async PooledDatabaseInterface.add_converter_with_array(converter, **kwargs)

Registers a converter and also registers the converter for arrays of its type.

Return type:

None

Cancellation

Application-level cancellation is supported automatically. If a query is cancelled via a cancel scope, a cancellation request will be issued to the server to avoid having to drain more events from the server if possible.

# timeout block will automatically cancel the query after a while
# and it will be returned to the pool for use (hopefully) immediately
with anyio.move_on_after(timeout):
    async for result in pool.fetch(really_long_query):
        await do_long_running_operation(result)

This also works automatically in transactions - the insertion will be cancelled and the transaction will be rolled back.