Implementing asynchrony in Python with the asyncio module

by Alex
Implementing asynchrony in Python with the asyncio module

Asynchronous programming is a feature of modern programming languages that allows you to execute operations without waiting for them to complete. Asynchrony is one of the important reasons for the popularity of Node.js. Imagine a web search application that opens a thousand connections. You could open a connection, get a result, and move on to the next one by moving in turn. However, this greatly increases the latency of the program. After all, opening a connection is an operation that takes time. And all this time subsequent operations are in the process of waiting. Asynchrony, on the other hand, provides a way to open thousands of connections simultaneously and switch between them. In essence, it is possible to open a connection and move on to the next one, waiting for a response from the first one. This continues until everyone returns a result. использовании асинхронности You can see in the graph that the synchronous approach will take 45 seconds, whereas with asynchrony the execution time can be reduced to 20 seconds.

Where does asynchrony apply in the real world?

Asynchrony is best suited for these scenarios:

  1. The program is taking too long to execute.
  2. The reason for the delay is not computation, but waiting for input or output.
  3. Tasks that involve several simultaneous input and output operations.

These could be:

  • Parsers,
  • Network services.

The difference in the concepts of concurrency, concurrency, threading, and asynchrony

Parallelism is the execution of multiple operations at a time. Multiprocessing is one example. Great for CPU-intensive tasks. Concurrency is a broader concept that describes multiple tasks running overlapping each other. Threading – a thread is a separate thread of execution. A single process can contain several threads, where each will run independently. Great for IO operations. Asynchronous – single-threaded, single-processor design that uses multitasking. In other words, asynchrony gives the impression of parallelism by using a single thread in a single process.

Components of asynchronous programming

Let’s break down the different components of asynchronous programming in detail. We will also use code for clarity.

Coprograms

Coroutines are generalized forms of subroutines. They are used for cooperative tasks and behave like Python generators. An asynchronous function uses the keyword await to define a coprogram. When it is used, the coprogram passes the control flow back to the event loop (also known as an event loop). To run a coprogram, you must schedule it in the event loop. Then the coprograms are wrapped in tasks(Tasks) as Future objects.

An example of a coprogram

In the code below the async_func function is called from the main function. You need to add the await keyword to the call of the synchronous function. The async_func function will not do anything without await.

import asyncio
async def async_func():
print('Run ...')
await asyncio.sleep(1)
print('... Done!')
async def main():
async_func() # this code won't return anything
await async_func()
asyncio.run(main())
Output: Warning (from warnings module):
File "AppDataLocalProgramsPythonPython38main.py", line 8
async_func() # this code won't return anything
RuntimeWarning: coroutine 'async_func' was never awaited
Running ...
... Done!

Tasks

Tasks are used to schedule execution of coprograms in parallel. When the coprogram is passed to the event loop for processing, a Task object can be obtained which provides a way to control the behavior of the coprogram from outside the event loop.

An example of a task

In the code below a create_task is created (a built-in function of the asyncio library) and then it is started.
import asyncio
async def async_func():
print('Running...')
await asyncio.sleep(1)
print('... Done!')
async def main():
task = asyncio.create_task (async_func())
await task
asyncio.run(main())
Output: Running ...
... Done!

Event loops

This mechanism executes the coprograms until they terminate. This can be seen as a while True loop, which keeps track of the coprograms, knowing when they are on standby so that, at that moment, you can do something else. It can wake up a sleeping co-program, when it is waiting its time to be executed. Only one event loop in Python can be executed at a time.

An example of an event loop

Next, three tasks are created and added to the list. They are executed asynchronously with get_event_loop, create_task and await of the asyncio library.
import asyncio
async def async_func(task_no):
print(f'{task_no}: Run ...')
await asyncio.sleep(1)
print(print(f'{task_no}: ... Done!')
async def main():
taskA = loop.create_task (async_func('taskA'))
taskB = loop.create_task(async_func('taskB'))
taskC = loop.create_task(async_func('taskC'))
await asyncio.wait([taskA,taskB,taskC])
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
except :
pass
Output: taskA: Run ...
taskB: Starting ...
taskC: Starting ...
taskA: ... Finished!
taskB: ... Done!
taskC: ... Done!

Future

Future is a special low-level object that represents the final result of an asynchronous operation. If this object waits(await), the coprogram waits until Future is executed elsewhere. In the following sections, let’s look at how Future is used.

A comparison between multithreading and asynchrony

Before turning to asynchrony, let’s try to test multi-threading for performance and compare the results. For this test we will retrieve data from URLs with different frequencies: 1, 10, 50, 100 and 500 times respectively. After that we will compare the performance of both approaches.

Implementation

Multithreading:
import requests
import time
from concurrent.futures import ProcessPoolExecutor
def fetch_url_data(pg_url):
try:
resp = requests.get(pg_url)
except Exception as e:
print(f “An error occurred while fetching data from url: {pg_url}”)
else:
return resp.content
def get_all_url_data(url_list):
with ProcessPoolExecutor() as executor:
resp = executor.map(fetch_url_data, url_list)
return resp
if __name__==’__main__’:
url = “https://www.uefa.com/uefaeuro-2020/”
for ntimes in [1,10,50,100,500]:
start_time = time.time()
responses = get_all_url_data([url] * ntimes)
print(f’Retrieved {ntimes} query results in {time.time() – start_time} seconds’)
Output: Obtained 1 query results in 0.913393939743041992 seconds
Got 10 query results in 1.7160518169403076 seconds
Got 50 query results in 3.842841625213623 seconds
Got 100 query results in 7.662721633911133 seconds
Get 500 query results in 32.575703620910645 seconds
ProcessPoolExecutor is a Python package that implements the Executor interface. fetch_url_data is a function for getting URL data using the request library. Once fetched, get_all_url_data is used to map function_url_data to a list of URLs. Asynchrony:
import asyncio
import time
from aiohttp import ClientSession, ClientResponseError
async def fetch_url_data(session, url):
try:
async with session.get(url, timeout=60) as response:
resp = await response.read()
except Exception as e:
print(e)
else:
return resp
return
async def fetch_async(loop, r):
url = "https://www.uefa.com/uefaeuro-2020/"
tasks = []
async with ClientSession() as session:
for i in range(r):
task = asyncio.ensure_future(fetch_url_data(session, url))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses
if __name__ == '__main__':
for ntimes in [1, 10, 50, 100, 500]:
start_time = time.time()
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(fetch_async(loop, ntimes))
# will run until it completes or an error occurs
loop.run_until_complete(future)
responses = future.result()
print(f'Retrieved {ntimes} query results in {time.time() - start_time} seconds')
Output: Got 1 query results in 0.41477298736572266 seconds
Got 10 query results in 0.46897053718566895 seconds
After getting 50 query results in 2.3057644367218018 seconds
Got 100 query results in 4.6860511302948 seconds
Get 500 query results in 18.013994455337524 seconds
You need to use the get_event_loop function to create and add tasks. To use more than one URL, you need to apply the ensure_future function. The fetch_async function is used to add tasks to the event loop object and fetch_url_data is used to read URL data with the session package. The future_result method returns the response of all tasks.

Results

As you can see, asynchronous programming is an order of magnitude more efficient than multithreading for this program.

Conclusions

Asynchronous programming shows better results in terms of performance by exploiting parallelism rather than multithreading. It is worth using in those programs where this parallelism can be applied.

Related Posts

LEAVE A COMMENT