Parallel tasks in Python: concurrent.futures


concurrent.futures is part of the standard library in Python 3.2+. If you're using an older version of Python, you need to install the futures package.

$ pip install futures


You should use the ProcessPoolExecutor for CPU intensive tasks and the ThreadPoolExecutor is suited for network operations or I/O. The ProcessPoolExecutor uses the multiprocessing module, which is not affected by GIL (Global Interpreter Lock) but also means that only picklable objects can be executed and returned.

In Python 3.5+, map() receives an optional argument: chunksize. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. With ThreadPoolExecutor, chunksize has no effect.

from concurrent.futures import ThreadPoolExecutor
import time

import requests

def fetch(a):
    url = '{0}'.format(a)
    r = requests.get(url)
    return r.json()['args']

start = time.time()

# if max_workers is None or not given, it will default to the number of processors, multiplied by 5
with ThreadPoolExecutor(max_workers=None) as executor:
    for result in, range(30)):
        print('response: {0}'.format(result))

print('Use requests + ThreadPoolExecutor cost: {}'.format(time.time() - start))


executor.submit() and as_completed()

executor.submit() returns a Future object. A Future is basically an object that encapsulates an asynchronous execution of a function that will finish (or raise an exception) in the future.

The main difference between map and as_completed is that map returns the results in the order in which you pass iterables. On the other hand, the first result from the as_completed function is from whichever future completed first. Besides, iterating a map() returns results of futures; iterating a as_completed(futures) returns futures themselves.

from concurrent.futures import ThreadPoolExecutor, as_completed

import requests

def fetch(url, timeout):
    r = requests.get(url, timeout=timeout)
    data = r.json()['args']
    return data

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {}
    for i in range(42):
        url = '{0}'.format(i)
        future = executor.submit(fetch, url, 60)
        futures[future] = url

    for future in as_completed(futures):
        url = futures[future]
            data = future.result()
        except Exception as exc:
            print('fetch {0}, get {1}'.format(url, data))