Asynchronous connection API

This API combines the client of the low-level API with the asynchronous capabilities of the anyio library to create an actual, networked client.

Warning

You probably don’t want to use this class directly. Instead, use the High-level API when possible. You should still read the documentation for this API, however, especially if you wish to use pooled transactions.

Connecting

In order to get anywhere, you need to actually connect to the server.

pg_purepy.connection.open_database_connection(address_or_path, username, *, port=5432, password=None, database=None, ssl_context=None)

Opens a new connection to the PostgreSQL database server. This is an asynchronous context manager.

async with open_database_connection("localhost", username="postgres") as db:
    ...

Required parameters:

Parameters:
  • address_or_path (str | PathLike[str]) – The address of the server or the absolute path of its Unix socket.

  • username (str) – The username to authenticate with.

Return type:

AsyncGenerator[AsyncPostgresConnection]

Optional parameters:

Parameters:
  • port (int) – The port to connect to. Ignored for unix sockets.

  • password (str | None) – The password to authenticate with.

  • database (str | None) – The database to connect to. Defaults to the username.

  • ssl_context (SSLContext | None) – The SSL context to use for TLS connection. Enables TLS if specified.

class pg_purepy.connection.AsyncPostgresConnection(address_or_path, port, ssl_context, stream, state, block_transactions=False)

Bases: object

An asynchronous connection to a PostgreSQL server. See method documentation for more info.

This class should not be directly instantiated; instead, use open_database_connection().

property ready: bool

If this connection is ready to send another query.

property in_transaction: bool

If this connection is currently in a SQL transaction.

property dead: bool

If this connection is unusable.

property connection_parameters: Mapping[str, str]

A read-only view of the current “connection parameters”.

This is a set of global variables set across a single PostgreSQL connection that control options such as the timezone or locale.

property server_timezone: str

The raw timezone for this connection.

Querying

There’s two ways to query a PostgreSQL database.

  • Eager queries, which load all data into memory at once.

  • Lazy queries, which lets the client apply backpressure by iterating over every row as it arrives.

There are high-level APIs for both eager and lazy queries, which wraps a low-level API that allows finer control of the actual messages arriving.

Warning

Querying the server is always protected by a anyio.ResourceGuard. Attempting to use the same connection simultaneously for two queries will raise a anyio.BrokenResourceError.

Changed in version 0.12.0: Changed the previous query lock to a conflict detector. This prevents nested queries from deadlocking.

Querying, Eagerly

Whilst pg-purepy doesn’t export a DBAPI 2.0 API as such, there are three high-level functions that resemble DBAPI. These three functions are likely the most useful functions when querying, but they are all eager functions and load the entire returned dataset into memory at once.

async AsyncPostgresConnection.fetch(query, *params, max_rows=None, **kwargs)

Eagerly fetches the result of a query. This returns a list of DataRow objects.

If you wish to lazily load the results of a query, use query() instead.

Parameters:
  • query (str | PreparedStatementInfo) – Either a str that contains the query text, or a PreparedStatementInfo that represents a pre-prepared query.

  • params (Any) – The positional arguments for the query.

  • max_rows (int | None) – The maximum rows to return.

  • kwargs (Any) – The colon arguments for the query.

Return type:

list[DataRow]

async AsyncPostgresConnection.execute(query, *params, max_rows=None, **kwargs)

Executes a query, returning its row count. This will discard all data rows.

Parameters:
  • query (str | PreparedStatementInfo) – Either a str that contains the query text, or a PreparedStatementInfo that represents a pre-prepared query.

  • params (Any) – The positional arguments for the query.

  • max_rows (int | None) – The maximum rows to return.

  • kwargs (Any) – The colon arguments for the query.

Return type:

int

async AsyncPostgresConnection.fetch_one(query, *params, **kwargs)

Like fetch_one(), but raises MissingRowError if there’s no row to fetch.

Return type:

DataRow

async AsyncPostgresConnection.fetch_one_or_none(query, *params, **kwargs)

Like fetch(), but only fetches one row.

Return type:

DataRow | None

For example, to insert some data, check how many rows were inserted, and verify it with a select:

async with open_database_connection(...) as conn:
    inserted = await conn.execute("insert into some_table (...) values (...);")
    print(f"Inserted {inserted} rows")
    row = await conn.fetch_one("select count(*) from some_table;")
    assert row.data[0] == inserted

Warning

Eager functions only support one query at a time, due to limitations in API design and the underlying protocol.

Querying, Lazily

If you have large data sets, or want to query lazily for other reasons, then query() can be used. This function is an asynchronous context manager, returning a QueryResult.

AsyncPostgresConnection.query(query, *params, max_rows=None, **kwargs)

Mid-level query API.

The query parameter can either be a string or a PreparedStatementInfo, as returned from AsyncPostgresConnection.create_prepared_statement(). If it is a string, and it has parameters, they can be provided as either positional arguments or as keyword arguments. If it is a pre-prepared statement, and it has parameters, they must be provided as positional arguments.

If keyword arguments are provided or a prepared statement is passed, an extended query with secure argument parsing will be used. Otherwise, a simple query will be used, which saves bandwidth over the extended query protocol.

If the server is currently in a failed transaction, then your query will be ignored. Make sure to issue a rollback beforehand, if needed.

This is an asynchronous context manager that yields a QueryResult, that can be asynchronously iterated over for the data rows of the query. Once all data rows have been iterated over, you can call QueryResult.consume_all() to get the total row count.

If max_rows is specified, then the query will only return up to that many rows. Otherwise, an unlimited amount may potentially be returned.

If this connection is currently executing another query, this method will raise a anyio.BusyResourceError. The same is true of all other query methods that call this function; be sure to protect this type with a lock or use a connection pool for multiple simultaneous connections.

Return type:

AsyncGenerator[QueryResult]

final class pg_purepy.connection.QueryResult(*, iterator)

Bases: AsyncIterator[DataRow]

Wraps the extended result of a query.

This is an asynchronous iterator; rows are retrieved with an async for. The number of rows seen by the iterator so far can be found with the seen_rows field, or the total number of rows can be eagerly fetched with consume_all() (but this will discard any further data rows).

seen_rows: int

The number of rows that this query has seen so far.

async consume_all()

Consumes all rows for this result and returns the total row count.

Return type:

int

Example usage:

async with conn.query("select * from table") as query:
    async for row in query:
        print(row.data)

    print("Total rows:", await query.consume_all())

Warning

The lazy functions only support one query at a time, due to limitations in API design and the underlying protocol.

Paramaterised Queries

Parameterised queries are also supported, using either positional arguments or keyword arguments, in either eager loading mode or lazy loading mode.

Positional argument parameters follow the PostgreSQL parameter syntax, where parameters are specified with $N where N is the index of the parameter. Keyword argument parameters follow the DBAPI colon-named syntax, where parameters are specified with :name where name is the keyword passed to the function.

Note

Internally, keyword argument parameters are converted into the positional format when creating the prepared statement. This means that only the positional format parameters are available when using explicitly created or loaded prepared statements.

selected = await conn.fetch("select * from some_table where column = :name;",
                            name=some_variable)
inserted = await conn.execute("insert into some_table(foo) values ($0, $1);",
                              x, y)

Low-level querying

If, for some reason, you need to access the messages returned during a query cycle, you can use the method lowlevel_query().

async AsyncPostgresConnection.lowlevel_query(query, *params, max_rows=None, **kwargs)

Performs a query to the server.

This is an asynchronous generator; it lazily fetches raw messages from the server without processing. You almost definitely want to use query() instead.

Return type:

AsyncGenerator[QueryResultMessage]

This function yields out the raw PostgresMessage objects that are received from the protocol, as well as handling any error responses.

Warning

As this is a raw asynchronous generator, this must be wrapped in an aclosing() block. See https://github.com/python-trio/trio/issues/265.

async with aclosing(conn.query("select * from table")) as agen:
    async for message in agen:
        if isinstance(message, RowDescription):
            print(f"Got row description:", message)

        elif isinstance(message, DataRow):
            print(f"Got data row", message.data)

For most queries, this function will yield the following sequence of messages, in this order:

The last message will always be a CommandComplete instance.

Prepared Statements

If you execute a significant number of the same query, a pre-created prepared statement instance can be used instead of the implicit one created when performing queries with parameters.

async AsyncPostgresConnection.create_prepared_statement(name, query)

Creates a prepared statement. This is part of the low-level query API.

Parameters:
  • name (str) – The name of the prepared statement.

  • query (str) – The query to use.

Return type:

PreparedStatementInfo

Error handling

The underlying low-level client reports server-side errors as ErrorOrNoticeResponse instances, but the mid-level connection objects will turn these into proper exceptions in the query functions.

All exceptions raised from ErrorResponses inherit from BaseDatabaseError.

exception pg_purepy.messages.BaseDatabaseError(response, query=None)

Bases: PostgresqlError

An exception produceed from the database, usually from an ErrorOrNoticeResponse message. This does NOT include things such as protocol parsing errors.

However, you shouldn’t catch this exception as the client differentiates these into two subtypes - recoverable errors via RecoverableDatabaseError, and unrecoverable errors via UnrecoverableDatabaseError. A general rule is that you should only catch the recoverable variant.

exception pg_purepy.messages.RecoverableDatabaseError(response, query=None)

Bases: BaseDatabaseError

A subclass of BaseDatabaseError that the client may potentially recover from. Examples include query errors.

exception pg_purepy.messages.UnrecoverableDatabaseError(response, query=None)

Bases: BaseDatabaseError

A subclass of BaseDatabaseError that the client must not recover from. This usually implies internal errors in the server.

Transaction Helpers

The mid-level API does nothing with transactions by default, operating in autocommit mode. However, it does supply a transaction helper which will automatically commit at the end of the async with block, or rollback if an error happens.

AsyncPostgresConnection.with_transaction()

Asynchronous context manager that automatically opens and closes a transaction.

Return type:

AsyncGenerator[None]

Warning

This will NOT protect against different tasks from calling query functions inside your transaction. This would require overly complicated locking logic! Instead, wrap your acquisition of this inside a different lock, and guard all other transaction helpers with it.