DISCOVERY

November 1st, 2020

Concurrency and Parallelism Concepts Translated into Python

Python

Concurrency

Parallelism

Parallelism, multithreading, multi-process programming, and asynchronous programming; Concepts dealing with concurrency are often the most difficult to learn when learning a new programming language. There are often many different approaches available and it’s hard to know the best approach (look no further than Java: Concurrency in Practice, a 350 page book on writing proper concurrency code in Java). Python is no different, with multiple evolving libraries, and, for added confusion, a global interpreter lock (GIL) which restricts Python code to a single thread when running on its default CPython interpreter. In this article, I attempt to demystify concurrent programming in Python and work with libraries such as concurrent.futures, asyncio, and aiohttp.

Ironically, the best place to start learning how concurrency works is to analyze non-concurrent, sequential code. Sequential code executes one task (instruction) after another. Each sequential program runs as a single process, which is an instance of a computer program. In theory, a process running sequential code executes in its entirety on a single core of a CPU without giving up control and allowing other processes to run concurrently (during the same time period). In reality, the operating system managing computer processes will likely perform context switches during a program's execution, allowing other processes to use the CPU concurrently1. For the purposes of this article I won’t focus on the operating system level concurrency of our processes, just the code written in our program.

The following Python code performs API requests that are executed sequentially. It makes an HTTP call to the API, waits for a response, processes the response, and then makes the next API call.

import time import requests domain = 'https://jsonplaceholder.typicode.com/' endpoints = ['posts', 'comments', 'albums', 'photos', 'todos', 'users'] def make_requests(): for endpoint in endpoints: url = f'{domain}{endpoint}' print(url) response = requests.get(url) print(response.status_code) def main(): start = time.time() make_requests() end = time.time() print(f'API calls made in: {end - start}') if __name__ == '__main__': main()

There are a few issues with this approach. The most glaring problem being that time is wasted while the program awaits a response from the API. Since the API call is a network I/O task to a remote server (potentially located thousands of miles away), the response could take anywhere from milliseconds to minutes. Either way, that is a lot of time that could be spent executing other tasks, such as another API call.

A better approach is to utilize concurrency, which Python provides multiple ways to achieve.

Concurrent programming is when multiple computations are executed during the same time period2. Programs running concurrently are sometimes also running in parallel, but not always. Programs running in parallel are executing at the same time, either on separate CPUs or on different cores of a single CPU. Programs written to utilize parallelism can also be described as running concurrently (just not necessarily vice-versa).

Concurrency vs. Parallelism

Concurrency

In programming, concurrency is when two or more programs are executing during the same time period. In a single CPU architecture these programs share the CPU, so their executions are interleaved. For example, program A runs for one second, then gives up the CPU to program B, which runs for two seconds. In a multiprocessor or multi-core architecture, these processes can run in parallel.

It’s important to note that concurrency does not imply parallelism. Computers are able to give the illusion that multiple programs are running simultaneously (in parallel) on the CPU without actually doing so. This is achieved with time sharing and context switching. On architectures with a single CPU and single core it is simply impossible to perform tasks in parallel.

Parallelism

In programming, parallel computing is when two or more programs are executing simultaneously. For example, program A and program B start at the same time on different processors, with program A completing in one second and program B completing in two seconds. Computer programs can spawn additional processes or threads which can run in parallel on a separate processor from the main process/thread.

By definition, computations which run in parallel are also run concurrently. Parallel processing is hardware dependent. Therefore, software with separate threads or processes designed to run on separate processors don't necessarily do so. If a computer's hardware is limited to a single CPU or core, these threads and processes will simply run concurrently on the same processor.

With the concepts of concurrency and parallelism in mind, I began refactoring the Python code which makes API calls. Python provides a concurrency.futures library, which allows computations to be executed asynchronously using either threads or processes.

Asynchronous Programming

Asynchronous programming is a form of concurrent programming that awaits events in a non-blocking fashion. In other words, asynchronous code allows other pieces of code to execute concurrently (and potentially in parallel) while it awaits external events to occur (such as API calls, filesystem I/O events, mouse clicks, long mathematical computations, etc.). Many high-level programming languages have built-in support for asynchronous programming.

concurrency.futures contains a ThreadPoolExecutor class which can run asynchronous tasks in separate threads (although with limitations - because of CPython’s Global Interpreter Lock [GIL] - which I will explain in a second). Using ThreadPoolExecutor can significantly speed up the sequential API calls I wrote previously.

import time import os from concurrent import futures import requests domain = 'https://jsonplaceholder.typicode.com/' endpoints = ['posts', 'comments', 'albums', 'photos', 'todos', 'users'] def make_request(endpoint: str) -> requests.Response: """ Make an API request to and endpoint on the globally defined domain name. Throws an exception if the response has an error status code. :param endpoint: The endpoint on the API to make a GET request to. :return: The response object of the API call. """ url = f'{domain}{endpoint}' response: requests.Response = requests.get(url) print(response) # Raise an error if the HTTP code is 4XX or 500. response.raise_for_status() return response def make_requests() -> int: """ Make requests to the API from a pool of threads running concurrently (although with limitations, because Python's Global Interpreter Lock [GIL] only allows Python code to run in a single thread at a time. However, I/O bound tasks release the GIL while they wait, allowing ThreadPoolExecutor to be faster than making the API calls synchronously). On my machine, this is approximately 2.5x faster than using requests to make API calls sequentially. """ workers = 5 with futures.ThreadPoolExecutor(workers) as executor: # make_request() calls will be made concurrently. res = executor.map(make_request, endpoints) return len(list(res)) def main() -> None: start = time.time() make_requests() end = time.time() print(f'API calls from worker threads made in: {end - start}') if __name__ == '__main__': main()

This code creates a thread pool with five workers, so that five API calls can be made concurrently.

As I mentioned, the CPython interpreter has a Global Interpreter Lock (GIL). The GIL allows only a single thread to run Python bytecode at a time, meaning that Python code is able to create multiple threads, but not run them in parallel3. A thread will acquire the lock when it wants to run Python code, and release it once it completes. Therefore, when using the default CPython interpreter, the ThreadPoolExecutor class can be a bit misleading. Due to the GIL, the ThreadPoolExecutor example runs concurrently but not in parallel.

Thread

A thread, also known as a lightweight process, is the most basic unit of scheduling in most computers4. A computer process is initially created with a single thread of execution, known as the main thread. Main threads have the ability to split and create one or more additional threads. A thread can be thought of as a subset of a process5. Threads share the memory space of their parent process, which is both beneficial and dangerous when writing multithreaded programs. The benefit is that variables which are global to the process can be utilized by all threads. The danger is there can be race conditions when multiple threads are accessing and modifying values in their shared memory space.

For I/O bound tasks like making API calls, using asynchronous programming approaches like ThreadPoolExecutor can still speed up the code6. This is because while the asynchronous call is awaiting an event, it releases the GIL. Therefore other threads can run Python code by acquiring the GIL. This allows API calls made with the ThreadPoolExecutor to run faster than sequential code. Unfortunately for CPU-bound programs, using multiple threads does not speed up execution times.

Python does provide a way to optimize CPU-bound tasks. Instead of creating multiple threads and running tasks in each, multiple processes can be created. concurrency.futures has a ProcessPoolExecutor class which provides this functionality. The API code can be rewritten again to use ProcessPoolExecutor (note: using processes instead of threads is less useful for I/O-bound tasks like API calls).

import time import os from concurrent import futures import requests domain = 'https://jsonplaceholder.typicode.com/' endpoints = ['posts', 'comments', 'albums', 'photos', 'todos', 'users'] def make_request(endpoint: str) -> requests.Response: url = f'{domain}{endpoint}' response: requests.Response = requests.get(url) print(response) # Raise an error if the HTTP code is 4XX or 500. response.raise_for_status() return response def make_requests_processes() -> int: cpu_cores = os.cpu_count() print(f'Number of CPU cores: {cpu_cores}') with futures.ProcessPoolExecutor() as executor: res = executor.map(make_request, endpoints) return len(list(res)) def main() -> None: start = time.time() make_requests_processes() end = time.time() print(f'API calls from worker processes made in: {end - start}') if __name__ == '__main__': main()

Python provides many libraries for asynchronous programming, but one of the newer and more significant libraries is asyncio.

With asyncio, the keywords async and await can be used. asyncio is a high-level library, abstracting away implementation details (such as its usage of coroutines). This design decision makes it easy to use and a preferred way to implement asynchronous programs.

Coroutine

A Python coroutine is a function defined with the async def keywords. Coroutines can be suspended and restarted many times throughout their lifecycle, making them ideal for asynchronous programming7. They can also receive data and return data throughout their lifecycle.

asyncio is a massive library and deserves its own article, but the following code snippet demonstrates the basics.

import asyncio import time from typing import Tuple, List, Any async def predicted_sunday_run_length() -> float: """ Simulate a long running operation that predicts the length of my Sunday long run. """ await asyncio.sleep(1) return 12.31 async def predicted_weekly_mileage() -> float: """ Simulate a long running operation that predicts my weekly running mileage. """ await asyncio.sleep(2) return 44 async def running_predictions() -> Tuple[float, float]: """ Using async/await, call both the simulated functions. """ sunday_run = await predicted_sunday_run_length() weekly_mileage = await predicted_weekly_mileage() return sunday_run, weekly_mileage async def running_predictions_concurrent() -> List[float]: """ The same as running_predictions() above, except it uses asyncio.gather() instead of two await statements. """ return await asyncio.gather(predicted_weekly_mileage(), predicted_sunday_run_length()) def main() -> None: start = time.time() sunday_run, weekly_mileage = asyncio.run(running_predictions()) end = time.time() print(sunday_run) print(weekly_mileage) # Two awaits completes in 3 seconds. print(f'two awaits completes in: {end - start}') start = time.time() predictions = asyncio.run(running_predictions_concurrent()) end = time.time() print(predictions[0]) print(predictions[1]) # await asyncio.gather completes in 2 seconds. print(f'await asyncio.gather() completes in: {end - start}')

This code demonstrates how asynchronous tasks created with asyncio can be run sequentially (running_predictions()) or concurrently (running_predictions_concurrent()). Coroutines defined with async def can be executed by passing them to asyncio.run().

Beyond the base Python language and standard libraries, third-party modules are freely available for use. There are many great third-party libraries which build upon asyncio. One such library is aiohttp, an asynchronous library for making HTTP requests8. With aiohttp, the Python API code I’ve shown throughout this article can receive one final refactoring.

import asyncio import time from typing import Tuple import aiohttp async def make_request(session: aiohttp.ClientSession, endpoint: str) -> Tuple[str, int]: """ Make an API request to and endpoint on a mocked REST API. :param session: An aiohttp session interface for making HTTP requests. :param endpoint: The endpoint on the API to make a GET request to. :return: The response JSON of the API call and the HTTP status code. """ url = f'https://jsonplaceholder.typicode.com/{endpoint}' async with session.get(url) as response: return await response.json(), response.status async def make_requests() -> int: """ Make HTTP requests using aiohttp. :return: The number of successful API calls made (with 200 HTTP codes). """ endpoints = ['posts', 'comments', 'albums', 'photos', 'todos', 'users'] success_count = 0 async with aiohttp.ClientSession() as session: for endpoint in endpoints: response: Tuple[str, int] = await make_request(session, endpoint) print(response[1]) if response[1] == 200: success_count += 1 return success_count def main() -> None: start = time.time() loop = asyncio.get_event_loop() success_count = loop.run_until_complete(make_requests()) end = time.time() print(f'API calls completed in: {end - start}') print(f'Success count: {success_count}') if __name__ == '__main__': main()

There are a vast number of libraries available in Python to help with concurrency and parallelism. Many of the lower-level classes that Python provides (including Thread, Task, and Semaphore) were omitted from this article in favor of higher-level libraries. While going over everything related to concurrency in Python requires a multi-hundred page book, in this article I explored how certain concurrent programming terminologies can translate into Python code. The full code for this article is available on GitHub.

[1] "Context switch", https://en.wikipedia.org/wiki/Context_switch

[2] "Concurrent computing", https://en.wikipedia.org/wiki/Concurrent_computing

[3] Luciano Ramalho, Fluent Python (Beijing: O'Reilly, 2015), 533

[4] Brian Goetz, Java Concurrency in Practice (Upper Saddle River, NJ: Addison-Wesley Professional, 2006), 3

[5] "Thread (computing): Threads vs. processes", https://en.wikipedia.org/wiki/Thread_(computing)#Threads_vs._processes

[6] "The Impact on Multi-Threaded Python Programs", https://realpython.com/python-gil/#the-impact-on-multi-threaded-python-programs

[7] "Glossary: coroutine", https://docs.python.org/3/glossary.html#term-coroutine

[8] "Welcome to AIOHTTP", https://docs.aiohttp.org/en/stable/