Python多任务库concurrent.futures
介绍
concurrent.futures是在python 3.2中加入到Python标准库中的,不过在Python2中也可以安装使用。这个库来源于java.util.concurrent。提供了管理线程和进程多任务一致的接口。
Executor
Executor是一个抽象基类,它提供了一组管理并发的接口。
submit(fn, *args, **kwargs)
: 返回一个Future对象。map(fn, *iterables, timeout=None, chunksize=1)
: 提供了Python内置的map函数一致的接口。chunksize用于ProcessPoolExcutor一次性提供多个数据给执行进程。shutdown(wait=True)
: 通知正在执行的任务释放资源,wait为False的时候会直接返回,不过Python程序会等到所有任务释放资源后才退出。
futures库实现了Executor的两个子类。
- ThreadPoolExecutor: 因为GIL的限制,Python中多线程不能完全发挥CPU的性能,所以更适合I/O密集型的任务。
- ProcessPoolExecutor: ProcessPoolExecutor使用了多进程,所以摆脱了GIL的限制,更适合CPU密集型的任务。但也因此只能使用pickable的函数和数据。
Future
cancel()
: 取消当前任务的执行,如果已经执行了或者不能取消时返回False,否则返回True。cancelled()
: 判断是否成功取消。running()
: 判断是否正在运行。done()
: 判断是否成功取消或者执行完毕。result(timeout=None)
: 获取执行结果。exception(timeout=None)
: 获取执行中抛出的异常。add_done_callback(fn)
: 添加回调函数。
as_completed(fs, timeout=None)
接收一个Future序列,按完成顺序返回Future对象。
wait(fs, timeout=None, return_when=ALL_COMPLETED)
接收一个Future序列,返回一个(done, not_done)
的tuple
。
import concurrent.futures
import urllib.request
URLS = ['https://www.foxnews.com/',
'https://www.cnn.com/',
'https://europe.wsj.com/',
'https://www.bbc.co.uk/',
'https://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
死锁
并发的时候肯定要考虑死锁的问题,不过使用Executor时要特别注意Excutor对象本身的资源占用也有可能会造成死锁。比如官方的例子:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
tornado中的使用
tornado在tornado.concurrent中实现了自定义的Future,和标准的Future有少许差别。不过tornado可以兼容使用标准库的Future对象。
class AsyncHandler(web.RequestHandler):
@gen.coroutine
def get(self):
with ThreadPoolExecutor(max_workers=1) as executor:
resp = yield executor.submit(requests.get, "https://example.com")
self.write(resp.text)
参考资料: