Parallel tasks in Python: concurrent.futures

Parallel tasks in Python: concurrent.futures

TL;DR: concurrent.futures is well suited to Embarrassingly Parallel tasks. You could write concurrent code with a simple for loop.

executor.map() runs the same function multiple times with different parameters and executor.submit() accepts any function with arbitrary parameters.

Install

concurrent.futures is part of the standard library in Python 3.2+. If you're using an older version of Python, you need to install the futures package.

$ pip install futures

ref:
https://docs.python.org/3/library/concurrent.futures.html

executor.map()

You should use the ProcessPoolExecutor for CPU intensive tasks and the ThreadPoolExecutor is suited for network operations or I/O. The ProcessPoolExecutor uses the multiprocessing module, which is not affected by GIL (Global Interpreter Lock) but also means that only picklable objects can be executed and returned.

In Python 3.5+, executor.map() receives an optional argument: chunksize. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. With ThreadPoolExecutor, chunksize has no effect.

from concurrent.futures import ThreadPoolExecutor
import time

import requests

def fetch(a):
    url = 'http://httpbin.org/get?a={0}'.format(a)
    r = requests.get(url)
    result = r.json()['args']
    return result

start = time.time()

# if max_workers is None or not given, it will default to the number of processors, multiplied by 5
with ThreadPoolExecutor(max_workers=None) as executor:
    for result in executor.map(fetch, range(42)):
        print('response: {0}'.format(result))

print('time: {0}'.format(time.time() - start))

You might want to change the value of max_workers to 1 and observe the difference.

ref:
https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
https://www.blog.pythonlibrary.org/2016/08/03/python-3-concurrency-the-concurrent-futures-module/
http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html

executor.submit()

executor.submit() returns a Future object. A Future is basically an object that encapsulates an asynchronous execution of a function that will finish (or raise an exception) in the future.

The main difference between map and as_completed is that map returns the results in the order in which you pass iterables. On the other hand, the first result from the as_completed function is from whichever future completed first. Besides, iterating a map() returns results of futures; iterating a as_completed(futures) returns futures themselves.

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

import requests

def fetch(url, timeout):
    r = requests.get(url, timeout=timeout)
    data = r.json()['args']
    return data

start = time.time()

with ThreadPoolExecutor(max_workers=20) as executor:
    futures = {}
    for i in range(42):
        url = 'https://httpbin.org/get?i={0}'.format(i)
        future = executor.submit(fetch, url, 60)
        futures[future] = url

    for future in as_completed(futures):
        url = futures[future]
        try:
            data = future.result()
        except Exception as exc:
            print(exc)
        else:
            print('fetch {0}, get {1}'.format(url, data))

print('time: {0}'.format(time.time() - start))

ref:
https://docs.python.org/3/library/concurrent.futures.html#future-objects

Discussion

ref:
https://news.ycombinator.com/item?id=16737129