477 lines
16 KiB
Plaintext
477 lines
16 KiB
Plaintext
PEP: 3148
|
||
Title: futures - execute computations asynchronously
|
||
Version: $Revision$
|
||
Last-Modified: $Date$
|
||
Author: Brian Quinlan <brian@sweetapp.com>
|
||
Status: Draft
|
||
Type: Standards Track
|
||
Content-Type: text/x-rst
|
||
Created: 16-Oct-2009
|
||
Python-Version: 3.2
|
||
Post-History:
|
||
|
||
========
|
||
Abstract
|
||
========
|
||
|
||
This PEP proposes a design for a package that facilitates the
|
||
evaluation of callables using threads and processes.
|
||
|
||
==========
|
||
Motivation
|
||
==========
|
||
|
||
Python currently has powerful primitives to construct multi-threaded
|
||
and multi-process applications but parallelizing simple operations
|
||
requires a lot of work i.e. explicitly launching processes/threads,
|
||
constructing a work/results queue, and waiting for completion or some
|
||
other termination condition (e.g. failure, timeout). It is also
|
||
difficult to design an application with a global process/thread limit
|
||
when each component invents its own parallel execution strategy.
|
||
|
||
=============
|
||
Specification
|
||
=============
|
||
|
||
Naming
|
||
------
|
||
|
||
The proposed package would be called "futures" and would live in a new
|
||
"concurrent" top-level package. The rationale behind pushing the
|
||
futures library into a "concurrent" namespace has multiple components.
|
||
The first, most simple one is to prevent any and all confusion with
|
||
the existing "from __future__ import x" idiom which has been in use
|
||
for a long time within Python. Additionally, it is felt that adding
|
||
the "concurrent" precursor to the name fully denotes what the library
|
||
is related to - namely concurrency - this should clear up any addition
|
||
ambiguity as it has been noted that not everyone in the community is
|
||
familiar with Java Futures, or the Futures term except as it relates
|
||
to the US stock market.
|
||
|
||
Finally; we are carving out a new namespace for the standard library -
|
||
obviously named "concurrent". In this namespace we hope to either add,
|
||
or move existing, concurrency-related libraries to this in the future.
|
||
A prime example is the multiprocessing.Pool work, as well as other
|
||
"addons" included in that module, which work across thread, and
|
||
process boundaries.
|
||
|
||
Interface
|
||
---------
|
||
|
||
The proposed package provides two core classes: `Executor` and
|
||
`Future`. An `Executor` receives asynchronous work requests (in terms
|
||
of a callable and its arguments) and returns a `Future` to represent
|
||
the execution of that work request.
|
||
|
||
Executor
|
||
''''''''
|
||
|
||
`Executor` is an abstract class that provides methods to execute calls
|
||
asynchronously.
|
||
|
||
``submit(fn, *args, **kwargs)``
|
||
|
||
Schedules the callable to be executed as ``fn(*args, **kwargs)``
|
||
and returns a `Future` instance representing the execution of the
|
||
callable.
|
||
|
||
This is an abstract method and must be implemented by Executor
|
||
subclasses.
|
||
|
||
``map(func, *iterables, timeout=None)``
|
||
|
||
Equivalent to ``map(func, *iterables)`` but func is executed
|
||
asynchronously and several calls to func may be made concurrently.
|
||
The returned iterator raises a `TimeoutError` if `__next__()` is
|
||
called and the result isn't available after *timeout* seconds from
|
||
the original call to `map()`. If *timeout* is not specified or
|
||
`None` then there is no limit to the wait time. If a call raises
|
||
an exception then that exception will be raised when its value is
|
||
retrieved from the iterator.
|
||
|
||
``shutdown(wait=True)``
|
||
|
||
Signal the executor that it should free any resources that it is
|
||
using when the currently pending futures are done executing.
|
||
Calls to `Executor.submit` and `Executor.map` and made after
|
||
shutdown will raise `RuntimeError`.
|
||
|
||
If wait is `True` then this method will not return until all the
|
||
pending futures are done executing and the resources associated
|
||
with the executor have been freed. If wait is `False` then this
|
||
method will return immediately and the resources associated with
|
||
the executor will be freed when all pending futures are done
|
||
executing. Regardless of the value of wait, the entire Python
|
||
program will not exit until all pending futures are done
|
||
executing.
|
||
|
||
| ``__enter__()``
|
||
| ``__exit__(exc_type, exc_val, exc_tb)``
|
||
|
||
When using an executor as a context manager, `__exit__` will call
|
||
``Executor.shutdown(wait=True)``.
|
||
|
||
|
||
ProcessPoolExecutor
|
||
'''''''''''''''''''
|
||
|
||
The `ProcessPoolExecutor` class is an `Executor` subclass that uses a
|
||
pool of processes to execute calls asynchronously. The callable
|
||
objects and arguments passed to `ProcessPoolExecutor.submit` must be
|
||
pickleable according to the same limitations as the multiprocessing
|
||
module.
|
||
|
||
Calling `Executor` or `Future` methods from within a callable
|
||
submitted to a `ProcessPoolExecutor` will result in deadlock.
|
||
|
||
``__init__(max_workers)``
|
||
|
||
Executes calls asynchronously using a pool of a most *max_workers*
|
||
processes. If *max_workers* is ``None`` or not given then as many
|
||
worker processes will be created as the machine has processors.
|
||
|
||
ThreadPoolExecutor
|
||
''''''''''''''''''
|
||
|
||
The `ThreadPoolExecutor` class is an `Executor` subclass that uses a
|
||
pool of threads to execute calls asynchronously.
|
||
|
||
Deadlock can occur when the callable associated with a `Future` waits
|
||
on the results of another `Future`. For example::
|
||
|
||
import time
|
||
def wait_on_b():
|
||
time.sleep(5)
|
||
print(b.result()) # b will never complete because it is waiting on a.
|
||
return 5
|
||
|
||
def wait_on_a():
|
||
time.sleep(5)
|
||
print(a.result()) # a will never complete because it is waiting on b.
|
||
return 6
|
||
|
||
|
||
executor = ThreadPoolExecutor(max_workers=2)
|
||
a = executor.submit(wait_on_b)
|
||
b = executor.submit(wait_on_a)
|
||
|
||
And::
|
||
|
||
def wait_on_future():
|
||
f = executor.submit(pow, 5, 2)
|
||
# This will never complete because there is only one worker thread and
|
||
# it is executing this function.
|
||
print(f.result())
|
||
|
||
executor = ThreadPoolExecutor(max_workers=1)
|
||
executor.submit(wait_on_future)
|
||
|
||
``__init__(max_workers)``
|
||
|
||
Executes calls asynchronously using a pool of at most
|
||
*max_workers* threads.
|
||
|
||
Future Objects
|
||
''''''''''''''
|
||
|
||
The `Future` class encapsulates the asynchronous execution of a
|
||
callable. `Future` instances are returned by `Executor.submit`.
|
||
|
||
``cancel()``
|
||
|
||
Attempt to cancel the call. If the call is currently being
|
||
executed then it cannot be cancelled and the method will return
|
||
`False`, otherwise the call will be cancelled and the method will
|
||
return `True`.
|
||
|
||
``cancelled()``
|
||
|
||
Return `True` if the call was successfully cancelled.
|
||
|
||
``running()``
|
||
|
||
Return `True` if the call is currently being executed and cannot
|
||
be cancelled.
|
||
|
||
``done()``
|
||
|
||
Return `True` if the call was successfully cancelled or finished
|
||
running.
|
||
|
||
``result(timeout=None)``
|
||
|
||
Return the value returned by the call. If the call hasn't yet
|
||
completed then this method will wait up to *timeout* seconds. If
|
||
the call hasn't completed in *timeout* seconds then a
|
||
`TimeoutError` will be raised. If *timeout* is not specified or
|
||
`None` then there is no limit to the wait time.
|
||
|
||
If the future is cancelled before completing then `CancelledError`
|
||
will be raised.
|
||
|
||
If the call raised then this method will raise the same exception.
|
||
|
||
``exception(timeout=None)``
|
||
|
||
Return the exception raised by the call. If the call hasn't yet
|
||
completed then this method will wait up to *timeout* seconds. If
|
||
the call hasn't completed in *timeout* seconds then a
|
||
`TimeoutError` will be raised. If *timeout* is not specified or
|
||
``None`` then there is no limit to the wait time.
|
||
|
||
If the future is cancelled before completing then `CancelledError`
|
||
will be raised.
|
||
|
||
If the call completed without raising then `None` is returned.
|
||
|
||
``add_done_callback(fn)``
|
||
|
||
Attaches a callable *fn* to the future that will be called when
|
||
the future is cancelled or finishes running. *fn* will be called
|
||
with the future as its only argument. Added callables are called
|
||
in the order that they were added and are always called in a
|
||
thread belonging to the process that added them.
|
||
|
||
If the future has already completed or been cancelled then *fn*
|
||
will be called immediately.
|
||
|
||
Internal Future Methods
|
||
^^^^^^^^^^^^^^^^^^^^^^^
|
||
|
||
The following `Future` methods are meant for use in unit tests and
|
||
`Executor` implementations.
|
||
|
||
``set_running_or_notify_cancel()``
|
||
|
||
Should be called by `Executor` implementations before executing
|
||
the work associated with the `Future`.
|
||
|
||
If the method returns `False` then the `Future` was cancelled,
|
||
i.e. `Future.cancel` was called and returned `True`. Any threads
|
||
waiting on the `Future` completing (i.e. through `as_completed()`
|
||
or `wait()`) will be woken up.
|
||
|
||
If the method returns `True` then the `Future` was not cancelled
|
||
and has been put in the running state, i.e. calls to
|
||
`Future.running()` will return `True`.
|
||
|
||
This method can only be called once and cannot be called after
|
||
`Future.set_result()` or `Future.set_exception()` have been
|
||
called.
|
||
|
||
``set_result(result)``
|
||
|
||
Sets the result of the work associated with the `Future`.
|
||
|
||
``set_exception(exception)``
|
||
|
||
Sets the result of the work associated with the `Future` to the
|
||
given `Exception`.
|
||
|
||
Module Functions
|
||
''''''''''''''''
|
||
|
||
``wait(fs, timeout=None, return_when=ALL_COMPLETED)``
|
||
|
||
Wait for the `Future` instances (possibly created by different
|
||
`Executor` instances) given by *fs* to complete. Returns a named
|
||
2-tuple of sets. The first set, named "done", contains the
|
||
futures that completed (finished or were cancelled) before the
|
||
wait completed. The second set, named "not_done", contains
|
||
uncompleted futures.
|
||
|
||
*timeout* can be used to control the maximum number of seconds to
|
||
wait before returning. If timeout is not specified or None then
|
||
there is no limit to the wait time.
|
||
|
||
*return_when* indicates when the method should return. It must be
|
||
one of the following constants:
|
||
|
||
============================= ==================================================
|
||
Constant Description
|
||
============================= ==================================================
|
||
`FIRST_COMPLETED` The method will return when any future finishes or
|
||
is cancelled.
|
||
`FIRST_EXCEPTION` The method will return when any future finishes by
|
||
raising an exception. If not future raises an
|
||
exception then it is equivalent to ALL_COMPLETED.
|
||
`ALL_COMPLETED` The method will return when all calls finish.
|
||
============================= ==================================================
|
||
|
||
``as_completed(fs, timeout=None)``
|
||
|
||
Returns an iterator over the `Future` instances given by *fs* that
|
||
yields futures as they complete (finished or were cancelled). Any
|
||
futures that completed before `as_completed()` was called will be
|
||
yielded first. The returned iterator raises a `TimeoutError` if
|
||
`__next__()` is called and the result isn't available after
|
||
*timeout* seconds from the original call to `as_completed()`. If
|
||
*timeout* is not specified or `None` then there is no limit to the
|
||
wait time.
|
||
|
||
The `Future` instances can have been created by different
|
||
`Executor` instances.
|
||
|
||
Check Prime Example
|
||
-------------------
|
||
|
||
::
|
||
|
||
from concurrent import futures
|
||
import math
|
||
|
||
PRIMES = [
|
||
112272535095293,
|
||
112582705942171,
|
||
112272535095293,
|
||
115280095190773,
|
||
115797848077099,
|
||
1099726899285419]
|
||
|
||
def is_prime(n):
|
||
if n % 2 == 0:
|
||
return False
|
||
|
||
sqrt_n = int(math.floor(math.sqrt(n)))
|
||
for i in range(3, sqrt_n + 1, 2):
|
||
if n % i == 0:
|
||
return False
|
||
return True
|
||
|
||
def main():
|
||
with futures.ProcessPoolExecutor() as executor:
|
||
for number, prime in zip(PRIMES, executor.map(is_prime,
|
||
PRIMES)):
|
||
print('%d is prime: %s' % (number, prime))
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|
||
Web Crawl Example
|
||
-----------------
|
||
|
||
::
|
||
|
||
from concurrent import futures
|
||
import urllib.request
|
||
|
||
URLS = ['http://www.foxnews.com/',
|
||
'http://www.cnn.com/',
|
||
'http://europe.wsj.com/',
|
||
'http://www.bbc.co.uk/',
|
||
'http://some-made-up-domain.com/']
|
||
|
||
def load_url(url, timeout):
|
||
return urllib.request.urlopen(url, timeout=timeout).read()
|
||
|
||
def main():
|
||
with futures.ThreadPoolExecutor(max_workers=5) as executor:
|
||
future_to_url = dict(
|
||
(executor.submit(load_url, url, 60), url)
|
||
for url in URLS)
|
||
|
||
for future in futures.as_completed(future_to_url):
|
||
url = future_to_url[future]
|
||
if future.exception() is not None:
|
||
print('%r generated an exception: %s' % (
|
||
url, future.exception()))
|
||
else:
|
||
print('%r page is %d bytes' % (
|
||
url, len(future.result())))
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|
||
=========
|
||
Rationale
|
||
=========
|
||
|
||
The proposed design of this module was heavily influenced by the the
|
||
Java java.util.concurrent package [1]_. The conceptual basis of the
|
||
module, as in Java, is the Future class, which represents the progress
|
||
and result of an asynchronous computation. The Future class makes
|
||
little commitment to the evaluation mode being used e.g. it can be be
|
||
used to represent lazy or eager evaluation, for evaluation using
|
||
threads, processes or remote procedure call.
|
||
|
||
Futures are created by concrete implementations of the Executor class
|
||
(called ExecutorService in Java). The reference implementation
|
||
provides classes that use either a process or a thread pool to eagerly
|
||
evaluate computations.
|
||
|
||
Futures have already been seen in Python as part of a popular Python
|
||
cookbook recipe [2]_ and have discussed on the Python-3000 mailing
|
||
list [3]_.
|
||
|
||
The proposed design is explicit, i.e. it requires that clients be
|
||
aware that they are consuming Futures. It would be possible to design
|
||
a module that would return proxy objects (in the style of `weakref`)
|
||
that could be used transparently. It is possible to build a proxy
|
||
implementation on top of the proposed explicit mechanism.
|
||
|
||
The proposed design does not introduce any changes to Python language
|
||
syntax or semantics. Special syntax could be introduced [4]_ to mark
|
||
function and method calls as asynchronous. A proxy result would be
|
||
returned while the operation is eagerly evaluated asynchronously, and
|
||
execution would only block if the proxy object were used before the
|
||
operation completed.
|
||
|
||
Anh Hai Trinh proposed a simpler but more limited API concept [5]_ and
|
||
the API has been discussed in some detail on stdlib-sig [6]_.
|
||
|
||
========================
|
||
Reference Implementation
|
||
========================
|
||
|
||
The reference implementation [7]_ contains a complete implementation
|
||
of the proposed design. It has been tested on Linux and Mac OS X.
|
||
|
||
==========
|
||
References
|
||
==========
|
||
|
||
.. [1]
|
||
`java.util.concurrent` package documentation
|
||
`http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/package-summary.html`
|
||
|
||
.. [2]
|
||
Python Cookbook recipe 84317, "Easy threading with Futures"
|
||
`http://code.activestate.com/recipes/84317/`
|
||
|
||
.. [3]
|
||
`Python-3000` thread, "mechanism for handling asynchronous concurrency"
|
||
`http://mail.python.org/pipermail/python-3000/2006-April/000960.html`
|
||
|
||
.. [4]
|
||
`Python 3000` thread, "Futures in Python 3000 (was Re: mechanism for handling asynchronous concurrency)"
|
||
`http://mail.python.org/pipermail/python-3000/2006-April/000970.html`
|
||
|
||
.. [5]
|
||
A discussion of `stream`, a similar concept proposed by Anh Hai Trinh
|
||
`http://www.mail-archive.com/stdlib-sig@python.org/msg00480.html`
|
||
|
||
.. [6]
|
||
A discussion of the proposed API on stdlib-sig
|
||
`http://mail.python.org/pipermail/stdlib-sig/2009-November/000731.html`
|
||
|
||
.. [7]
|
||
Reference `futures` implementation
|
||
`http://code.google.com/p/pythonfutures/source/browse/#svn/branches/feedback`
|
||
|
||
=========
|
||
Copyright
|
||
=========
|
||
|
||
This document has been placed in the public domain.
|
||
|
||
|
||
|
||
..
|
||
Local Variables:
|
||
mode: indented-text
|
||
indent-tabs-mode: nil
|
||
sentence-end-double-space: t
|
||
fill-column: 70
|
||
coding: utf-8
|
||
End:
|