TL;DR: concurrent.futures
is well suited to Embarrassingly Parallel tasks. You could write concurrent code with a simple for
loop.
executor.map()
runs the same function multiple times with different parameters and executor.submit()
accepts any function with arbitrary parameters.
Install
concurrent.futures
is part of the standard library in Python 3.2+. If you're using an older version of Python, you need to install the futures
package.
$ pip install futures
ref:
https://docs.python.org/3/library/concurrent.futures.html
executor.map()
You should use the ProcessPoolExecutor
for CPU intensive tasks and the ThreadPoolExecutor
is suited for network operations or I/O. The ProcessPoolExecutor
uses the multiprocessing
module, which is not affected by GIL (Global Interpreter Lock) but also means that only picklable objects can be executed and returned.
In Python 3.5+, executor.map()
receives an optional argument: chunksize
. For very long iterables, using a large value for chunksize
can significantly improve performance compared to the default size of 1
. With ThreadPoolExecutor
, chunksize
has no effect.
from concurrent.futures import ThreadPoolExecutor
import time
import requests
def fetch(a):
url = 'http://httpbin.org/get?a={0}'.format(a)
r = requests.get(url)
result = r.json()['args']
return result
start = time.time()
# if max_workers is None or not given, it will default to the number of processors, multiplied by 5
with ThreadPoolExecutor(max_workers=None) as executor:
for result in executor.map(fetch, range(42)):
print('response: {0}'.format(result))
print('time: {0}'.format(time.time() - start))
You might want to change the value of max_workers
to 1
and observe the difference.
ref:
https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
https://www.blog.pythonlibrary.org/2016/08/03/python-3-concurrency-the-concurrent-futures-module/
http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html
executor.submit()
executor.submit()
returns a Future
object. A Future
is basically an object that encapsulates an asynchronous execution of a function that will finish (or raise an exception) in the future.
The main difference between map
and as_completed
is that map
returns the results in the order in which you pass iterables. On the other hand, the first result from the as_completed
function is from whichever future completed first. Besides, iterating a map()
returns results of futures; iterating a as_completed(futures)
returns futures themselves.
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import requests
def fetch(url, timeout):
r = requests.get(url, timeout=timeout)
data = r.json()['args']
return data
start = time.time()
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {}
for i in range(42):
url = 'https://httpbin.org/get?i={0}'.format(i)
future = executor.submit(fetch, url, 60)
futures[future] = url
for future in as_completed(futures):
url = futures[future]
try:
data = future.result()
except Exception as exc:
print(exc)
else:
print('fetch {0}, get {1}'.format(url, data))
print('time: {0}'.format(time.time() - start))
ref:
https://docs.python.org/3/library/concurrent.futures.html#future-objects