22. Concurrent Programming with asyncio and Pipelining
Concurrent Programming with asyncio and Oracle Database Pipelining significantly enhances the overall performance and responsiveness of applications.
22.1. Concurrent Programming with asyncio
The Asynchronous I/O (asyncio) Python library can be used with python-oracledb Thin mode for
concurrent programming. This library allows you to run operations in parallel,
for example to run a long-running operation in the background without blocking
the rest of the application. With asyncio, you can easily write concurrent code
with the async
and await
syntax. See Python’s Developing with asyncio documentation for
useful tips.
The python-oracledb asynchronous API is a part of the standard python-oracledb module. All the synchronous methods that require a round-trip to the database have corresponding asynchronous counterparts. You can choose whether to use the synchronous API or the asynchronous API in your code. It is recommended to not use both at the same time in your application.
The asynchronous API classes are AsyncConnection, AsyncConnectionPool, AsyncCursor, and AsyncLOB.
Note
Concurrent programming with asyncio is only supported in python-oracledb Thin mode.
22.1.1. Connecting to Oracle Database Asynchronously
With python-oracledb, you can create an asynchronous connection to Oracle Database using either standalone connections or pooled connections. (For discussion of synchronous programming, see Connecting to Oracle Database.)
22.1.1.1. Standalone Connections
Standalone connections are useful for applications that need only a single connection to a database.
An asynchronous standalone connection can be created by calling the
asynchronous method oracledb.connect_async()
which establishes a
connection to the database and returns an AsyncConnection Object. Once connections are created, all objects created by these
connections follow the asynchronous programming model. Subject to appropriate
use of await
for calls that require a round-trip to the database,
asynchronous connections are used in the same way that synchronous programs use
Standalone Connections.
Asynchronous connections should be released when they are no longer needed to ensure Oracle Database gracefully cleans up. A preferred method is to use an asynchronous context manager. For example:
import asyncio
import oracledb
async def main():
async with oracledb.connect_async(user="hr", password=userpwd,
dsn="localhost/orclpdb") as connection:
with connection.cursor() as cursor:
await cursor.execute("select user from dual")
async for result in cursor:
print(result)
asyncio.run(main())
This code ensures that once the block is completed, the connection is closed
and resources are reclaimed by the database. In addition, any attempt to use
the variable connection
outside of the block will fail.
If you do not use a context manager, you should explicitly close connections when they are no longer needed, for example:
connection = await oracle.connect_async(user="hr", password=userpwd,
dsn="localhost/orclpdb")
cursor = connection.cursor()
await cursor.execute("select user from dual")
async for result in cursor:
print(result)
cursor.close()
await connection.close()
22.1.1.2. Connection Pools
Connection pooling allows applications to create and maintain a pool of open connections to the database. Connection pooling is important for performance and scalability when applications need to handle a large number of users who do database work for short periods of time but have relatively long periods when the connections are not needed. The high availability features of pools also make small pools useful for applications that want a few connections available for infrequent use and requires them to be immediately usable when acquired.
An asynchronous connection pool can be created by calling
oracledb.create_pool_async()
which returns an AsyncConnectionPool
Object. Note that this method is synchronous and does not
use await
. Once the pool has been created, your application can get a
connection from it by calling AsyncConnectionPool.acquire()
. After
your application has used a connection, it should be released back to the pool
to make it available for other users. This can be done by explicitly closing
the connection or by using an asynchronous context manager, for example:
import asyncio
import oracledb
async def main():
pool = oracle.create_pool_async(user="hr", password=userpwd,
dsn="localhost/orclpdb",
min=1, max=4, increment=1)
async with pool.acquire() as connection:
with connection.cursor() as cursor:
await cursor.execute("select user from dual")
async for result in cursor:
print(result)
await pool.close()
asyncio.run(main())
22.1.2. Executing SQL Using Asynchronous Methods
This section covers executing SQL using the asynchronous programming model. For discussion of synchronous programming, see Executing SQL.
Your application communicates with Oracle Database by executing SQL
statements. Statements such as queries (statements beginning with SELECT or
WITH), Data Manipulation Language (DML), and Data Definition Language (DDL) are
executed using the asynchronous methods AsyncCursor.execute()
or
AsyncCursor.executemany()
. Rows can be iterated over, or fetched using
one of the methods AsyncCursor.fetchone()
,
AsyncCursor.fetchone()
, AsyncCursor.fetchmany()
, or
AsyncCursor.fetchall()
.
You can also use shortcut methods on the API: AsyncConnection Objects object such as
AsyncConnection.execute()
or
AsyncConnection.executemany()
. Rows can be fetched using one of the
shortcut methods AsyncConnection.fetchone()
,
AsyncConnection.fetchmany()
, AsyncConnection.fetchall()
,
AsyncConnection.fetch_df_all()
, or
AsyncConnection.fetch_df_batches()
.
An example of using AsyncConnection.fetchall()
:
import asyncio
import oracledb
async def main():
async with oracledb.connect_async(user="hr", password=userpwd,
dsn="localhost/orclpdb") as connection:
res = await connection.fetchall("select * from locations")
print(res)
asyncio.run(main())
An example that uses asyncio for parallelization and shows the execution of multiple coroutines:
import asyncio
import oracledb
# Number of coroutines to run
CONCURRENCY = 5
# Query the unique session identifier/serial number combination of a connection
SQL = """SELECT UNIQUE CURRENT_TIMESTAMP AS CT, sid||'-'||serial# AS SIDSER
FROM V$SESSION_CONNECT_INFO
WHERE sid = SYS_CONTEXT('USERENV', 'SID')"""
# Show the unique session identifier/serial number of each connection that the
# pool opens
async def init_session(connection, requested_tag):
res = await connection.fetchone(SQL)
print(res[0].strftime("%H:%M:%S.%f"), '- init_session with SID-SERIAL#', res[1])
# The coroutine simply shows the session identifier/serial number of the
# connection returned by the pool.acquire() call
async def query(pool):
async with pool.acquire() as connection:
await connection.callproc("dbms_session.sleep", [1])
res = await connection.fetchone(SQL)
print(res[0].strftime("%H:%M:%S.%f"), '- query with SID-SERIAL#', res[1])
async def main():
pool = oracledb.create_pool_async(user="hr", password=userpwd,
dsn="localhost/orclpdb",
min=1, max=CONCURRENCY,
session_callback=init_session)
coroutines = [ query(pool) for i in range(CONCURRENCY) ]
await asyncio.gather(*coroutines)
await pool.close()
asyncio.run(main())
When you run this, you will see that multiple connections (identified by the
unique Session Identifier and Serial Number combination) are opened and are
used by query()
. For example:
12:09:29.711525 - init_session with SID-SERIAL# 36-38096
12:09:29.909769 - init_session with SID-SERIAL# 33-56225
12:09:30.085537 - init_session with SID-SERIAL# 14-31431
12:09:30.257232 - init_session with SID-SERIAL# 285-40270
12:09:30.434538 - init_session with SID-SERIAL# 282-32608
12:09:30.730166 - query with SID-SERIAL# 36-38096
12:09:30.933957 - query with SID-SERIAL# 33-56225
12:09:31.115008 - query with SID-SERIAL# 14-31431
12:09:31.283593 - query with SID-SERIAL# 285-40270
12:09:31.457474 - query with SID-SERIAL# 282-32608
Your results may vary depending how fast your environment is.
See async_gather.py for a runnable example.
22.1.3. Managing Transactions Using Asynchronous Methods
This section covers managing transactions using the asynchronous programming model. For discussion of synchronous programming, see Managing Transactions.
When AsyncCursor.execute()
or AsyncCursor.executemany()
executes a SQL statement, a transaction is started or continued. By default,
python-oracledb does not commit this transaction to the database. The methods
AsyncConnection.commit()
and AsyncConnection.rollback()
methods can be used to explicitly commit or rollback a transaction:
async def main():
async with oracledb.connect_async(user="hr", password=userpwd,
dsn="localhost/orclpdb") as connection:
with connection.cursor as cursor:
await cursor.execute("INSERT INTO mytab (name) VALUES ('John')")
await connection.commit()
When a database connection is closed, such as with
AsyncConnection.close()
, or when variables referencing the connection
go out of scope, any uncommitted transaction will be rolled back.
An alternative way to commit is to set the attribute
AsyncConnection.autocommit
of the connection to True
. This
ensures all DML statements (INSERT, UPDATE, and so on) are
committed as they are executed.
Note that irrespective of the autocommit value, Oracle Database will always commit an open transaction when a DDL statement is executed.
When executing multiple DML statements that constitute a single transaction, it is recommended to use autocommit mode only for the last DML statement in the sequence of operations. Unnecessarily committing causes extra database load, and can destroy transactional consistency.
22.2. Pipelining Database Operations
Pipelining allows an application to send multiple, independent statements to Oracle Database with one call. The database can be kept busy without waiting for the application to receive a result set and send the next statement. While the database processes the pipeline of statements, the application can continue with non-database work. When the database has executed all the pipelined operations, their results are returned to the application.
Pipelined operations are executed sequentially by the database. They do not execute concurrently. It is local tasks that can be executed at the same time the database is working.
Effective use of Oracle Database Pipelining can increase the responsiveness of
an application and improve overall system throughput. Pipelining is useful when
many small operations are being performed in rapid succession. It is most
beneficial when the network to the database is slow. This is because of its
reduction in round-trips compared with those required if
the equivalent SQL statements were individually executed with calls like
AsyncCursor.execute()
.
Pipelining is only supported in python-oracledb Thin mode with asyncio.
See Oracle Call Interface Pipelining for more information about Oracle Database Pipelining.
Note
True pipelining only occurs when you are connected to Oracle Database 23ai.
When you connect to an older database, operations are sequentially executed by python-oracledb. Each operation concludes before the next is sent to the database. There is no reduction in round-trips and no performance benefit. This usage is only recommended for code portability such as when preparing for a database upgrade.
22.2.1. Using Pipelines
To create a pipeline to process a set of database
operations, use oracledb.create_pipeline()
.
pipeline = oracledb.create_pipeline()
You can then add various operations to the pipeline using
add_callfunc()
, add_callproc()
,
add_commit()
, add_execute()
,
add_executemany()
, add_fetchall()
,
add_fetchmany()
, and add_fetchone()
. For
example:
pipeline.add_execute("insert into mytable (mycol) values (1234)")
pipeline.add_fetchone("select user from dual")
pipeline.add_fetchmany("select employee_id from employees", num_rows=20)
Note that queries that return results do not call add_execute()
.
Only one set of query results can be returned from each query operation. For
example add_fetchmany()
will only fetch the first set of
query records, up to the limit specified by the method’s num_rows
parameter. Similarly for add_fetchone()
only the first row
can ever be fetched. It is not possible to fetch more data from these
operations. To prevent the database processing rows that cannot be fetched by
the application, consider adding appropriate WHERE
conditions or using a
FETCH NEXT
clause in the statement, see Limiting Rows.
Query results or OUT binds from one operation cannot be passed to subsequent operations in the same pipeline.
To execute the pipeline, call AsyncConnection.run_pipeline()
.
results = await connection.run_pipeline(pipeline)
The operations are all sent to the database and executed. The method returns a list of PipelineOpResult objects, one entry per operation. The objects contain information about the execution of the relevant operation, such as any error number, PL/SQL function return value, or any query rows and column metadata.
The Connection.call_timeout
value has no effect on pipeline operations.
To limit the time for a pipeline, use an asyncio timeout, available
from Python 3.11.
To tune fetching of rows with Pipeline.add_fetchall()
, set
defaults.arraysize
or pass the arraysize
parameter.
22.2.1.1. Pipelining Examples
An example of pipelining is:
import asyncio
import oracledb
async def main():
# Create a pipeline and define the operations
pipeline = oracledb.create_pipeline()
pipeline.add_fetchone("select temperature from weather")
pipeline.add_fetchall("select name from friends where active = true")
pipeline.add_fetchmany("select story from news order by popularity", num_rows=5)
connection = await oracle.connect_async(user="hr", password=userpwd,
dsn="localhost/orclpdb")
# Run the operations in the pipeline
result_1, result_2, result_3 = await connection.run_pipeline(pipeline)
# Print the database responses
print("Current temperature:", result_1.rows)
print("Active friends:", result_2.rows)
print("Top news stories:", result_3.rows)
await connection.close()
asyncio.run(main())
See pipelining_basic.py for a runnable example.
To allow an application to continue with non-database work before processing any responses from the database, use code similar to:
async def run_thing_one():
return "thing_one"
async def run_thing_two():
return "thing_two"
async def main():
connection = await oracle.connect_async(user="hr", password=userpwd,
dsn="localhost/orclpdb")
pipeline = oracledb.create_pipeline()
pipeline.add_fetchone("select user from dual")
pipeline.add_fetchone("select sysdate from dual")
# Run the pipeline and non-database operations concurrently
return_values = await asyncio.gather(
run_thing_one(), run_thing_two(), connection.run_pipeline(pipeline)
)
for r in return_values:
if isinstance(r, list): # the pipeline return list
for result in r:
if result.rows:
for row in result.rows:
print(*row, sep="\t")
else:
print(r) # a local operation result
await connection.close()
asyncio.run(main())
Output will be like:
thing_one
thing_two
HR
2024-10-29 03:34:43
See pipelining_parallel.py for a runnable example.
22.2.2. Using OUT Binds with Pipelines
To fetch OUT binds from executed statements, create an explicit
cursor and use Cursor.var()
. These variables are associated with the
connection and can be used by the other cursors created internally for each
pipelined operation. For example:
cursor = connection.cursor()
v1 = cursor.var(oracledb.DB_TYPE_BOOLEAN)
v2 = cursor.var(oracledb.DB_TYPE_VARCHAR)
pipeline = oracledb.create_pipeline()
pipeline.add_execute("""
begin
:1 := true;
:2 := 'Python';
end;
""", [v1, v2])
pipeline.add_fetchone("select 1234 from dual")
results = await connection.run_pipeline(pipeline)
for r in results:
if r.rows:
print(r.rows)
print(v1.getvalue(), v2.getvalue())
This prints:
[(1234,)]
True Python
OUT binds from one operation cannot be used in subsequent operations. For
example the following would print only True
because the WHERE condition of
the SQL statement is not matched:
cursor = connection.cursor()
v1 = cursor.var(oracledb.DB_TYPE_BOOLEAN)
pipeline = oracledb.create_pipeline()
pipeline.add_execute("""
begin
:1 := TRUE;
end;
""", [v1])
pipeline.add_fetchone("select 1234 from dual where :1 = TRUE", [v1])
results = await connection.run_pipeline(pipeline)
for r in results:
if r.rows:
print(r.rows)
print(v1.getvalue()) # prints True
22.2.3. Pipeline Error Handling
The continue_on_error
parameter to AsyncConnection.run_pipeline()
determines whether subsequent operations should continue to run after a failure
in one operation has occurred. When set to the default value False, if any
error is returned in any operation in the pipeline then the database terminates
all subsequent operations.
For example:
# Stop on error
pipeline.add_fetchall("select 1234 from does_not_exist")
pipeline.add_fetchone("select 5678 from dual")
r1, r2 = await connection.run_pipeline(pipeline)
will only execute the first operation and will throw the failure message:
oracledb.exceptions.DatabaseError: ORA-00942: table or view "HR"."DOES_NOT_EXIST" does not exist
Help: https://docs.oracle.com/error-help/db/ora-00942/
whereas this code:
# Continue on error
pipeline.add_fetchall("select 1234 from does_not_exist")
pipeline.add_fetchone("select 5678 from dual")
r1, r2 = await connection.run_pipeline(pipeline, continue_on_error=True)
print(r1.error)
print(r2.rows)
will execute all operations and will display:
ORA-00942: table or view "HR"."DOES_NOT_EXIST" does not exist
Help: https://docs.oracle.com/error-help/db/ora-00942/
[(5678,)]
PL/SQL Compilation Warnings
PL/SQL Compilation Warnings can be identified by checking the PipelineOpResult
Attribute PipelineOpResult.warning
. For
example:
pipeline.add_execute(
"""create or replace procedure myproc as
begin
bogus;
end;"""
)
(result,) = await connection.run_pipeline(pipeline)
print(result.warning.full_code)
print(result.warning)
will print:
DPY-7000
DPY-7000: creation succeeded with compilation errors
See pipelining_error.py for a runnable example showing warnings and errors.
22.2.4. Pipeline Cursor Usage
For each operation added to a pipeline, with the exception of
Pipeline.add_commit()
, a cursor will be opened when
AsyncConnection.run_pipeline()
is called. For example, the following
code will open two cursors:
pipeline = oracledb.create_pipeline()
pipeline.add_execute("insert into t1 (c1) values (1234)")
pipeline.add_fetchone("select user from dual")
await connection.run_pipeline(pipeline)
Make sure your pipeline length does not exceed your cursor limit. Set the database parameter open_cursors appropriately.
22.2.5. Pipeline Round-trips
The complete set of operations in a pipeline will be performed in a single
round-trip when AsyncConnection.run_pipeline()
is
called, with the following exceptions:
Queries that contain LOBs require an additional round-trip
Queries that contain DbObject values may require multiple round-trips
Queries with
add_fetchall()
may require multiple round-trips
The reduction in round-trips is the significant contributor to pipelining’s performance improvement in comparison to explicitly executing the equivalent SQL statements individually. With high-speed networks there may be little performance benefit to using pipelining, however the database and network efficiencies can help overall system scalability.
Note that the traditional method of monitoring round-trips by taking snapshots of the V$SESSTAT view is not accurate for pipelines.