495 lines
17 KiB
ReStructuredText
495 lines
17 KiB
ReStructuredText
PEP: 3148
|
||
Title: futures - execute computations asynchronously
|
||
Version: $Revision$
|
||
Last-Modified: $Date$
|
||
Author: Brian Quinlan <brian@sweetapp.com>
|
||
Status: Final
|
||
Type: Standards Track
|
||
Content-Type: text/x-rst
|
||
Created: 16-Oct-2009
|
||
Python-Version: 3.2
|
||
Post-History:
|
||
|
||
========
|
||
Abstract
|
||
========
|
||
|
||
This PEP proposes a design for a package that facilitates the
|
||
evaluation of callables using threads and processes.
|
||
|
||
==========
|
||
Motivation
|
||
==========
|
||
|
||
Python currently has powerful primitives to construct multi-threaded
|
||
and multi-process applications but parallelizing simple operations
|
||
requires a lot of work i.e. explicitly launching processes/threads,
|
||
constructing a work/results queue, and waiting for completion or some
|
||
other termination condition (e.g. failure, timeout). It is also
|
||
difficult to design an application with a global process/thread limit
|
||
when each component invents its own parallel execution strategy.
|
||
|
||
=============
|
||
Specification
|
||
=============
|
||
|
||
Naming
|
||
------
|
||
|
||
The proposed package would be called "futures" and would live in a new
|
||
"concurrent" top-level package. The rationale behind pushing the
|
||
futures library into a "concurrent" namespace has multiple components.
|
||
The first, most simple one is to prevent any and all confusion with
|
||
the existing "from __future__ import x" idiom which has been in use
|
||
for a long time within Python. Additionally, it is felt that adding
|
||
the "concurrent" precursor to the name fully denotes what the library
|
||
is related to - namely concurrency - this should clear up any addition
|
||
ambiguity as it has been noted that not everyone in the community is
|
||
familiar with Java Futures, or the Futures term except as it relates
|
||
to the US stock market.
|
||
|
||
Finally; we are carving out a new namespace for the standard library -
|
||
obviously named "concurrent". We hope to either add, or move existing,
|
||
concurrency-related libraries to this in the future. A prime example
|
||
is the multiprocessing.Pool work, as well as other "addons" included
|
||
in that module, which work across thread and process boundaries.
|
||
|
||
Interface
|
||
---------
|
||
|
||
The proposed package provides two core classes: ``Executor`` and
|
||
``Future``. An ``Executor`` receives asynchronous work requests (in terms
|
||
of a callable and its arguments) and returns a ``Future`` to represent
|
||
the execution of that work request.
|
||
|
||
Executor
|
||
''''''''
|
||
|
||
``Executor`` is an abstract class that provides methods to execute calls
|
||
asynchronously.
|
||
|
||
``submit(fn, *args, **kwargs)``
|
||
|
||
Schedules the callable to be executed as ``fn(*args, **kwargs)``
|
||
and returns a ``Future`` instance representing the execution of the
|
||
callable.
|
||
|
||
This is an abstract method and must be implemented by Executor
|
||
subclasses.
|
||
|
||
``map(func, *iterables, timeout=None)``
|
||
|
||
Equivalent to ``map(func, *iterables)`` but func is executed
|
||
asynchronously and several calls to func may be made concurrently.
|
||
The returned iterator raises a ``TimeoutError`` if ``__next__()`` is
|
||
called and the result isn't available after *timeout* seconds from
|
||
the original call to ``map()``. If *timeout* is not specified or
|
||
``None`` then there is no limit to the wait time. If a call raises
|
||
an exception then that exception will be raised when its value is
|
||
retrieved from the iterator.
|
||
|
||
``shutdown(wait=True)``
|
||
|
||
Signal the executor that it should free any resources that it is
|
||
using when the currently pending futures are done executing.
|
||
Calls to ``Executor.submit`` and ``Executor.map`` and made after
|
||
shutdown will raise ``RuntimeError``.
|
||
|
||
If wait is ``True`` then this method will not return until all the
|
||
pending futures are done executing and the resources associated
|
||
with the executor have been freed. If wait is ``False`` then this
|
||
method will return immediately and the resources associated with
|
||
the executor will be freed when all pending futures are done
|
||
executing. Regardless of the value of wait, the entire Python
|
||
program will not exit until all pending futures are done
|
||
executing.
|
||
|
||
| ``__enter__()``
|
||
| ``__exit__(exc_type, exc_val, exc_tb)``
|
||
|
||
When using an executor as a context manager, ``__exit__`` will call
|
||
``Executor.shutdown(wait=True)``.
|
||
|
||
|
||
ProcessPoolExecutor
|
||
'''''''''''''''''''
|
||
|
||
The ``ProcessPoolExecutor`` class is an ``Executor`` subclass that uses a
|
||
pool of processes to execute calls asynchronously. The callable
|
||
objects and arguments passed to ``ProcessPoolExecutor.submit`` must be
|
||
pickleable according to the same limitations as the multiprocessing
|
||
module.
|
||
|
||
Calling ``Executor`` or ``Future`` methods from within a callable
|
||
submitted to a ``ProcessPoolExecutor`` will result in deadlock.
|
||
|
||
``__init__(max_workers)``
|
||
|
||
Executes calls asynchronously using a pool of a most *max_workers*
|
||
processes. If *max_workers* is ``None`` or not given then as many
|
||
worker processes will be created as the machine has processors.
|
||
|
||
ThreadPoolExecutor
|
||
''''''''''''''''''
|
||
|
||
The ``ThreadPoolExecutor`` class is an ``Executor`` subclass that uses a
|
||
pool of threads to execute calls asynchronously.
|
||
|
||
Deadlock can occur when the callable associated with a ``Future`` waits
|
||
on the results of another ``Future``. For example::
|
||
|
||
import time
|
||
def wait_on_b():
|
||
time.sleep(5)
|
||
print(b.result()) # b will never complete because it is waiting on a.
|
||
return 5
|
||
|
||
def wait_on_a():
|
||
time.sleep(5)
|
||
print(a.result()) # a will never complete because it is waiting on b.
|
||
return 6
|
||
|
||
|
||
executor = ThreadPoolExecutor(max_workers=2)
|
||
a = executor.submit(wait_on_b)
|
||
b = executor.submit(wait_on_a)
|
||
|
||
And::
|
||
|
||
def wait_on_future():
|
||
f = executor.submit(pow, 5, 2)
|
||
# This will never complete because there is only one worker thread and
|
||
# it is executing this function.
|
||
print(f.result())
|
||
|
||
executor = ThreadPoolExecutor(max_workers=1)
|
||
executor.submit(wait_on_future)
|
||
|
||
``__init__(max_workers)``
|
||
|
||
Executes calls asynchronously using a pool of at most
|
||
*max_workers* threads.
|
||
|
||
Future Objects
|
||
''''''''''''''
|
||
|
||
The ``Future`` class encapsulates the asynchronous execution of a
|
||
callable. ``Future`` instances are returned by ``Executor.submit``.
|
||
|
||
``cancel()``
|
||
|
||
Attempt to cancel the call. If the call is currently being
|
||
executed then it cannot be cancelled and the method will return
|
||
``False``, otherwise the call will be cancelled and the method will
|
||
return ``True``.
|
||
|
||
``cancelled()``
|
||
|
||
Return ``True`` if the call was successfully cancelled.
|
||
|
||
``running()``
|
||
|
||
Return ``True`` if the call is currently being executed and cannot
|
||
be cancelled.
|
||
|
||
``done()``
|
||
|
||
Return ``True`` if the call was successfully cancelled or finished
|
||
running.
|
||
|
||
``result(timeout=None)``
|
||
|
||
Return the value returned by the call. If the call hasn't yet
|
||
completed then this method will wait up to *timeout* seconds. If
|
||
the call hasn't completed in *timeout* seconds then a
|
||
``TimeoutError`` will be raised. If *timeout* is not specified or
|
||
``None`` then there is no limit to the wait time.
|
||
|
||
If the future is cancelled before completing then ``CancelledError``
|
||
will be raised.
|
||
|
||
If the call raised then this method will raise the same exception.
|
||
|
||
``exception(timeout=None)``
|
||
|
||
Return the exception raised by the call. If the call hasn't yet
|
||
completed then this method will wait up to *timeout* seconds. If
|
||
the call hasn't completed in *timeout* seconds then a
|
||
``TimeoutError`` will be raised. If *timeout* is not specified or
|
||
``None`` then there is no limit to the wait time.
|
||
|
||
If the future is cancelled before completing then ``CancelledError``
|
||
will be raised.
|
||
|
||
If the call completed without raising then ``None`` is returned.
|
||
|
||
``add_done_callback(fn)``
|
||
|
||
Attaches a callable *fn* to the future that will be called when
|
||
the future is cancelled or finishes running. *fn* will be called
|
||
with the future as its only argument.
|
||
|
||
Added callables are called in the order that they were added and
|
||
are always called in a thread belonging to the process that added
|
||
them. If the callable raises an ``Exception`` then it will be
|
||
logged and ignored. If the callable raises another
|
||
``BaseException`` then behavior is not defined.
|
||
|
||
If the future has already completed or been cancelled then *fn*
|
||
will be called immediately.
|
||
|
||
Internal Future Methods
|
||
^^^^^^^^^^^^^^^^^^^^^^^
|
||
|
||
The following ``Future`` methods are meant for use in unit tests and
|
||
``Executor`` implementations.
|
||
|
||
``set_running_or_notify_cancel()``
|
||
|
||
Should be called by ``Executor`` implementations before executing
|
||
the work associated with the ``Future``.
|
||
|
||
If the method returns ``False`` then the ``Future`` was cancelled,
|
||
i.e. ``Future.cancel`` was called and returned ``True``. Any threads
|
||
waiting on the ``Future`` completing (i.e. through ``as_completed()``
|
||
or ``wait()``) will be woken up.
|
||
|
||
If the method returns ``True`` then the ``Future`` was not cancelled
|
||
and has been put in the running state, i.e. calls to
|
||
``Future.running()`` will return ``True``.
|
||
|
||
This method can only be called once and cannot be called after
|
||
``Future.set_result()`` or ``Future.set_exception()`` have been
|
||
called.
|
||
|
||
``set_result(result)``
|
||
|
||
Sets the result of the work associated with the ``Future``.
|
||
|
||
``set_exception(exception)``
|
||
|
||
Sets the result of the work associated with the ``Future`` to the
|
||
given ``Exception``.
|
||
|
||
Module Functions
|
||
''''''''''''''''
|
||
|
||
``wait(fs, timeout=None, return_when=ALL_COMPLETED)``
|
||
|
||
Wait for the ``Future`` instances (possibly created by different
|
||
``Executor`` instances) given by *fs* to complete. Returns a named
|
||
2-tuple of sets. The first set, named "done", contains the
|
||
futures that completed (finished or were cancelled) before the
|
||
wait completed. The second set, named "not_done", contains
|
||
uncompleted futures.
|
||
|
||
*timeout* can be used to control the maximum number of seconds to
|
||
wait before returning. If timeout is not specified or None then
|
||
there is no limit to the wait time.
|
||
|
||
*return_when* indicates when the method should return. It must be
|
||
one of the following constants:
|
||
|
||
============================= ==================================================
|
||
Constant Description
|
||
============================= ==================================================
|
||
``FIRST_COMPLETED`` The method will return when any future finishes or
|
||
is cancelled.
|
||
``FIRST_EXCEPTION`` The method will return when any future finishes by
|
||
raising an exception. If not future raises an
|
||
exception then it is equivalent to ALL_COMPLETED.
|
||
``ALL_COMPLETED`` The method will return when all calls finish.
|
||
============================= ==================================================
|
||
|
||
``as_completed(fs, timeout=None)``
|
||
|
||
Returns an iterator over the ``Future`` instances given by *fs* that
|
||
yields futures as they complete (finished or were cancelled). Any
|
||
futures that completed before ``as_completed()`` was called will be
|
||
yielded first. The returned iterator raises a ``TimeoutError`` if
|
||
``__next__()`` is called and the result isn't available after
|
||
*timeout* seconds from the original call to ``as_completed()``. If
|
||
*timeout* is not specified or ``None`` then there is no limit to the
|
||
wait time.
|
||
|
||
The ``Future`` instances can have been created by different
|
||
``Executor`` instances.
|
||
|
||
Check Prime Example
|
||
-------------------
|
||
|
||
::
|
||
|
||
from concurrent import futures
|
||
import math
|
||
|
||
PRIMES = [
|
||
112272535095293,
|
||
112582705942171,
|
||
112272535095293,
|
||
115280095190773,
|
||
115797848077099,
|
||
1099726899285419]
|
||
|
||
def is_prime(n):
|
||
if n % 2 == 0:
|
||
return False
|
||
|
||
sqrt_n = int(math.floor(math.sqrt(n)))
|
||
for i in range(3, sqrt_n + 1, 2):
|
||
if n % i == 0:
|
||
return False
|
||
return True
|
||
|
||
def main():
|
||
with futures.ProcessPoolExecutor() as executor:
|
||
for number, prime in zip(PRIMES, executor.map(is_prime,
|
||
PRIMES)):
|
||
print('%d is prime: %s' % (number, prime))
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|
||
Web Crawl Example
|
||
-----------------
|
||
|
||
::
|
||
|
||
from concurrent import futures
|
||
import urllib.request
|
||
|
||
URLS = ['http://www.foxnews.com/',
|
||
'http://www.cnn.com/',
|
||
'http://europe.wsj.com/',
|
||
'http://www.bbc.co.uk/',
|
||
'http://some-made-up-domain.com/']
|
||
|
||
def load_url(url, timeout):
|
||
return urllib.request.urlopen(url, timeout=timeout).read()
|
||
|
||
def main():
|
||
with futures.ThreadPoolExecutor(max_workers=5) as executor:
|
||
future_to_url = dict(
|
||
(executor.submit(load_url, url, 60), url)
|
||
for url in URLS)
|
||
|
||
for future in futures.as_completed(future_to_url):
|
||
url = future_to_url[future]
|
||
try:
|
||
print('%r page is %d bytes' % (
|
||
url, len(future.result())))
|
||
except Exception as e:
|
||
print('%r generated an exception: %s' % (
|
||
url, e))
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|
||
=========
|
||
Rationale
|
||
=========
|
||
|
||
The proposed design of this module was heavily influenced by the
|
||
Java java.util.concurrent package [1]_. The conceptual basis of the
|
||
module, as in Java, is the Future class, which represents the progress
|
||
and result of an asynchronous computation. The Future class makes
|
||
little commitment to the evaluation mode being used e.g. it can be
|
||
used to represent lazy or eager evaluation, for evaluation using
|
||
threads, processes or remote procedure call.
|
||
|
||
Futures are created by concrete implementations of the Executor class
|
||
(called ExecutorService in Java). The reference implementation
|
||
provides classes that use either a process or a thread pool to eagerly
|
||
evaluate computations.
|
||
|
||
Futures have already been seen in Python as part of a popular Python
|
||
cookbook recipe [2]_ and have discussed on the Python-3000 mailing
|
||
list [3]_.
|
||
|
||
The proposed design is explicit, i.e. it requires that clients be
|
||
aware that they are consuming Futures. It would be possible to design
|
||
a module that would return proxy objects (in the style of ``weakref``)
|
||
that could be used transparently. It is possible to build a proxy
|
||
implementation on top of the proposed explicit mechanism.
|
||
|
||
The proposed design does not introduce any changes to Python language
|
||
syntax or semantics. Special syntax could be introduced [4]_ to mark
|
||
function and method calls as asynchronous. A proxy result would be
|
||
returned while the operation is eagerly evaluated asynchronously, and
|
||
execution would only block if the proxy object were used before the
|
||
operation completed.
|
||
|
||
Anh Hai Trinh proposed a simpler but more limited API concept [5]_ and
|
||
the API has been discussed in some detail on stdlib-sig [6]_.
|
||
|
||
The proposed design was discussed on the Python-Dev mailing list [7]_.
|
||
Following those discussions, the following changes were made:
|
||
|
||
* The ``Executor`` class was made into an abstract base class
|
||
* The ``Future.remove_done_callback`` method was removed due to a lack
|
||
of convincing use cases
|
||
* The ``Future.add_done_callback`` method was modified to allow the
|
||
same callable to be added many times
|
||
* The ``Future`` class's mutation methods were better documented to
|
||
indicate that they are private to the ``Executor`` that created them
|
||
|
||
========================
|
||
Reference Implementation
|
||
========================
|
||
|
||
The reference implementation [8]_ contains a complete implementation
|
||
of the proposed design. It has been tested on Linux and Mac OS X.
|
||
|
||
==========
|
||
References
|
||
==========
|
||
|
||
.. [1]
|
||
``java.util.concurrent`` package documentation
|
||
http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/package-summary.html
|
||
|
||
.. [2]
|
||
Python Cookbook recipe 84317, "Easy threading with Futures"
|
||
http://code.activestate.com/recipes/84317/
|
||
|
||
.. [3]
|
||
``Python-3000`` thread, "mechanism for handling asynchronous concurrency"
|
||
https://mail.python.org/pipermail/python-3000/2006-April/000960.html
|
||
|
||
.. [4]
|
||
``Python 3000`` thread, "Futures in Python 3000 (was Re: mechanism for handling asynchronous concurrency)"
|
||
https://mail.python.org/pipermail/python-3000/2006-April/000970.html
|
||
|
||
.. [5]
|
||
A discussion of ``stream``, a similar concept proposed by Anh Hai Trinh
|
||
http://www.mail-archive.com/stdlib-sig@python.org/msg00480.html
|
||
|
||
.. [6]
|
||
A discussion of the proposed API on stdlib-sig
|
||
https://mail.python.org/pipermail/stdlib-sig/2009-November/000731.html
|
||
|
||
.. [7]
|
||
A discussion of the PEP on python-dev
|
||
https://mail.python.org/pipermail/python-dev/2010-March/098169.html
|
||
|
||
.. [8]
|
||
Reference ``futures`` implementation
|
||
http://code.google.com/p/pythonfutures/source/browse/#svn/branches/feedback
|
||
|
||
=========
|
||
Copyright
|
||
=========
|
||
|
||
This document has been placed in the public domain.
|
||
|
||
|
||
|
||
..
|
||
Local Variables:
|
||
mode: indented-text
|
||
indent-tabs-mode: nil
|
||
sentence-end-double-space: t
|
||
fill-column: 70
|
||
coding: utf-8
|
||
End:
|