Asynchronous connection API¶
This API combines the client of the low-level API with the asynchronous capabilities of
the anyio library to create an actual, networked client.
Warning
You probably don’t want to use this class directly. Instead, use the High-level API when possible. You should still read the documentation for this API, however, especially if you wish to use pooled transactions.
Connecting¶
In order to get anywhere, you need to actually connect to the server.
- pg_purepy.connection.open_database_connection(address_or_path, username, *, port=5432, password=None, database=None, ssl_context=None)¶
Opens a new connection to the PostgreSQL database server. This is an asynchronous context manager.
async with open_database_connection("localhost", username="postgres") as db: ...
Required parameters:
- Parameters:
- Return type:
Optional parameters:
- Parameters:
port (
int) – The port to connect to. Ignored for unix sockets.database (
str|None) – The database to connect to. Defaults to the username.ssl_context (
SSLContext|None) – The SSL context to use for TLS connection. Enables TLS if specified.
- class pg_purepy.connection.AsyncPostgresConnection(address_or_path, port, ssl_context, stream, state, block_transactions=False)¶
Bases:
objectAn asynchronous connection to a PostgreSQL server. See method documentation for more info.
This class should not be directly instantiated; instead, use
open_database_connection().
Querying¶
There’s two ways to query a PostgreSQL database.
Eager queries, which load all data into memory at once.
Lazy queries, which lets the client apply backpressure by iterating over every row as it arrives.
There are high-level APIs for both eager and lazy queries, which wraps a low-level API that allows finer control of the actual messages arriving.
Warning
Querying the server is always protected by a anyio.ResourceGuard. Attempting to use
the same connection simultaneously for two queries will raise a
anyio.BrokenResourceError.
Changed in version 0.12.0: Changed the previous query lock to a conflict detector. This prevents nested queries from deadlocking.
Querying, Eagerly¶
Whilst pg-purepy doesn’t export a DBAPI 2.0 API as such, there are three high-level functions
that resemble DBAPI. These three functions are likely the most useful functions when querying,
but they are all eager functions and load the entire returned dataset into memory at once.
- async AsyncPostgresConnection.fetch(query, *params, max_rows=None, **kwargs)¶
Eagerly fetches the result of a query. This returns a list of
DataRowobjects.If you wish to lazily load the results of a query, use
query()instead.- Parameters:
query (
str|PreparedStatementInfo) – Either astrthat contains the query text, or aPreparedStatementInfothat represents a pre-prepared query.params (
Any) – The positional arguments for the query.kwargs (
Any) – The colon arguments for the query.
- Return type:
- async AsyncPostgresConnection.execute(query, *params, max_rows=None, **kwargs)¶
Executes a query, returning its row count. This will discard all data rows.
- Parameters:
query (
str|PreparedStatementInfo) – Either astrthat contains the query text, or aPreparedStatementInfothat represents a pre-prepared query.params (
Any) – The positional arguments for the query.kwargs (
Any) – The colon arguments for the query.
- Return type:
- async AsyncPostgresConnection.fetch_one(query, *params, **kwargs)¶
Like
fetch_one(), but raisesMissingRowErrorif there’s no row to fetch.- Return type:
- async AsyncPostgresConnection.fetch_one_or_none(query, *params, **kwargs)¶
Like
fetch(), but only fetches one row.
For example, to insert some data, check how many rows were inserted, and verify it with a select:
async with open_database_connection(...) as conn:
inserted = await conn.execute("insert into some_table (...) values (...);")
print(f"Inserted {inserted} rows")
row = await conn.fetch_one("select count(*) from some_table;")
assert row.data[0] == inserted
Warning
Eager functions only support one query at a time, due to limitations in API design and the underlying protocol.
Querying, Lazily¶
If you have large data sets, or want to query lazily for other reasons, then
query() can be used. This function is an
asynchronous context manager, returning a QueryResult.
- AsyncPostgresConnection.query(query, *params, max_rows=None, **kwargs)¶
Mid-level query API.
The
queryparameter can either be a string or aPreparedStatementInfo, as returned fromAsyncPostgresConnection.create_prepared_statement(). If it is a string, and it has parameters, they can be provided as either positional arguments or as keyword arguments. If it is a pre-prepared statement, and it has parameters, they must be provided as positional arguments.If keyword arguments are provided or a prepared statement is passed, an extended query with secure argument parsing will be used. Otherwise, a simple query will be used, which saves bandwidth over the extended query protocol.
If the server is currently in a failed transaction, then your query will be ignored. Make sure to issue a rollback beforehand, if needed.
This is an asynchronous context manager that yields a
QueryResult, that can be asynchronously iterated over for the data rows of the query. Once all data rows have been iterated over, you can callQueryResult.consume_all()to get the total row count.If
max_rowsis specified, then the query will only return up to that many rows. Otherwise, an unlimited amount may potentially be returned.If this connection is currently executing another query, this method will raise a
anyio.BusyResourceError. The same is true of all other query methods that call this function; be sure to protect this type with a lock or use a connection pool for multiple simultaneous connections.- Return type:
- final class pg_purepy.connection.QueryResult(*, iterator)¶
Bases:
AsyncIterator[DataRow]Wraps the extended result of a query.
This is an asynchronous iterator; rows are retrieved with an
async for. The number of rows seen by the iterator so far can be found with theseen_rowsfield, or the total number of rows can be eagerly fetched withconsume_all()(but this will discard any further data rows).
Example usage:
async with conn.query("select * from table") as query:
async for row in query:
print(row.data)
print("Total rows:", await query.consume_all())
Warning
The lazy functions only support one query at a time, due to limitations in API design and the underlying protocol.
Paramaterised Queries¶
Parameterised queries are also supported, using either positional arguments or keyword arguments, in either eager loading mode or lazy loading mode.
Positional argument parameters follow the PostgreSQL parameter syntax, where parameters are
specified with $N where N is the index of the parameter. Keyword argument parameters follow the
DBAPI colon-named syntax, where parameters are specified with :name where name is the keyword
passed to the function.
Note
Internally, keyword argument parameters are converted into the positional format when creating the prepared statement. This means that only the positional format parameters are available when using explicitly created or loaded prepared statements.
selected = await conn.fetch("select * from some_table where column = :name;",
name=some_variable)
inserted = await conn.execute("insert into some_table(foo) values ($0, $1);",
x, y)
Low-level querying¶
If, for some reason, you need to access the messages returned during a query cycle, you can
use the method lowlevel_query().
- async AsyncPostgresConnection.lowlevel_query(query, *params, max_rows=None, **kwargs)¶
Performs a query to the server.
This is an asynchronous generator; it lazily fetches raw messages from the server without processing. You almost definitely want to use
query()instead.- Return type:
This function yields out the raw PostgresMessage objects that are received from the
protocol, as well as handling any error responses.
Warning
As this is a raw asynchronous generator, this must be wrapped in an aclosing() block.
See https://github.com/python-trio/trio/issues/265.
async with aclosing(conn.query("select * from table")) as agen:
async for message in agen:
if isinstance(message, RowDescription):
print(f"Got row description:", message)
elif isinstance(message, DataRow):
print(f"Got data row", message.data)
For most queries, this function will yield the following sequence of messages, in this order:
A
RowDescriptioninstance (that may be empty).Zero to N
DataRowinstancesOne
CommandCompleteinstance.
The last message will always be a CommandComplete instance.
Prepared Statements¶
If you execute a significant number of the same query, a pre-created prepared statement instance can be used instead of the implicit one created when performing queries with parameters.
- async AsyncPostgresConnection.create_prepared_statement(name, query)¶
Creates a prepared statement. This is part of the low-level query API.
- Parameters:
- Return type:
Error handling¶
The underlying low-level client reports server-side errors as ErrorOrNoticeResponse
instances, but the mid-level connection objects will turn these into proper exceptions in the
query functions.
All exceptions raised from ErrorResponses inherit from BaseDatabaseError.
- exception pg_purepy.messages.BaseDatabaseError(response, query=None)¶
Bases:
PostgresqlErrorAn exception produceed from the database, usually from an ErrorOrNoticeResponse message. This does NOT include things such as protocol parsing errors.
However, you shouldn’t catch this exception as the client differentiates these into two subtypes -
recoverable errors via RecoverableDatabaseError, and unrecoverable errors via
UnrecoverableDatabaseError. A general rule is that you should only catch the recoverable
variant.
- exception pg_purepy.messages.RecoverableDatabaseError(response, query=None)¶
Bases:
BaseDatabaseErrorA subclass of
BaseDatabaseErrorthat the client may potentially recover from. Examples include query errors.
- exception pg_purepy.messages.UnrecoverableDatabaseError(response, query=None)¶
Bases:
BaseDatabaseErrorA subclass of
BaseDatabaseErrorthat the client must not recover from. This usually implies internal errors in the server.
Transaction Helpers¶
The mid-level API does nothing with transactions by default, operating in autocommit mode. However,
it does supply a transaction helper which will automatically commit at the end of the async with
block, or rollback if an error happens.
- AsyncPostgresConnection.with_transaction()¶
Asynchronous context manager that automatically opens and closes a transaction.
- Return type:
Warning
This will NOT protect against different tasks from calling query functions inside your transaction. This would require overly complicated locking logic! Instead, wrap your acquisition of this inside a different lock, and guard all other transaction helpers with it.