Update from Brian Quinlan::

Here are some updates based on the latest round of python-dev feedback. It:

    - changes the names of the named-tuple returned by wait()
    - consistently uses the world "callable" rather than "function" or "method"
    - clarifies the calling context of future callbacks
    - removes the ability to remove call backs
    - allows the same callback to be repeatedly added
    - adds a pointer to pickle
    - clarifies that the interpreter will not exit with running futures
    - clarifies that module methods accept futures from different executors
This commit is contained in:
Brett Cannon 2010-06-28 04:54:01 +00:00
parent 371c3fa34b
commit f93f3e1797
1 changed files with 94 additions and 83 deletions

View File

@ -33,65 +33,6 @@ when each component invents its own parallel execution strategy.
Specification Specification
============= =============
Check Prime Example
-------------------
::
from concurrent import futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
with futures.ProcessPoolExecutor() as executor:
for number, is_prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, is_prime))
Web Crawl Example
-----------------
::
from concurrent import futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
return urllib.request.urlopen(url, timeout=timeout).read()
with futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = dict((executor.submit(load_url, url, 60), url)
for url in URLS)
for future in futures.as_completed(future_to_url):
url = future_to_url[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (url,
future.exception()))
else:
print('%r page is %d bytes' % (url, len(future.result())))
Interface Interface
--------- ---------
@ -111,7 +52,7 @@ asynchronously.
Schedules the callable to be executed as ``fn(*args, **kwargs)`` Schedules the callable to be executed as ``fn(*args, **kwargs)``
and returns a `Future` instance representing the execution of the and returns a `Future` instance representing the execution of the
function. callable.
This is an abstract method and must be implemented by Executor This is an abstract method and must be implemented by Executor
subclasses. subclasses.
@ -134,9 +75,14 @@ asynchronously.
Calls to `Executor.submit` and `Executor.map` and made after Calls to `Executor.submit` and `Executor.map` and made after
shutdown will raise `RuntimeError`. shutdown will raise `RuntimeError`.
If wait is `True` then the executor will not return until all the If wait is `True` then this method will not return until all the
pending futures are done executing and the resources associated pending futures are done executing and the resources associated
with the executor have been freed. with the executor have been freed. If wait is `False` then this
method will return immediately and the resources associated with
the executor will be freed when all pending futures are done
executing. Regardless of the value of wait, the entire Python
program will not exit until all pending futures are done
executing.
| ``__enter__()`` | ``__enter__()``
| ``__exit__(exc_type, exc_val, exc_tb)`` | ``__exit__(exc_type, exc_val, exc_tb)``
@ -151,7 +97,7 @@ ProcessPoolExecutor
The `ProcessPoolExecutor` class is an `Executor` subclass that uses a The `ProcessPoolExecutor` class is an `Executor` subclass that uses a
pool of processes to execute calls asynchronously. The callable pool of processes to execute calls asynchronously. The callable
objects and arguments passed to `ProcessPoolExecutor.submit` must be objects and arguments passed to `ProcessPoolExecutor.submit` must be
serializeable according to the same limitations as the multiprocessing pickleable according to the same limitations as the multiprocessing
module. module.
Calling `Executor` or `Future` methods from within a callable Calling `Executor` or `Future` methods from within a callable
@ -208,8 +154,7 @@ Future Objects
'''''''''''''' ''''''''''''''
The `Future` class encapsulates the asynchronous execution of a The `Future` class encapsulates the asynchronous execution of a
function or method call. `Future` instances are returned by callable. `Future` instances are returned by `Executor.submit`.
`Executor.submit`.
``cancel()`` ``cancel()``
@ -260,22 +205,14 @@ function or method call. `Future` instances are returned by
``add_done_callback(fn)`` ``add_done_callback(fn)``
Attaches a function *fn* to the future that will be called when Attaches a callable *fn* to the future that will be called when
the future is cancelled or finishes running. *fn* will be called the future is cancelled or finishes running. *fn* will be called
with the future as its only argument. with the future as its only argument. Added callables are called
in the order that they were added and are always called in a
thread belonging to the process that added them.
If the future has already completed or been cancelled then *fn* If the future has already completed or been cancelled then *fn*
will be called immediately. If the same function is added several will be called immediately.
times then it will still only be called once.
NOTE: This method can be used to create adapters from Futures to
Twisted Deferreds.
``remove_done_callback(fn)``
Removes the function *fn*, which was previously attached to the
future using `add_done_callback`. `KeyError` is raised if the
function was not previously attached.
Internal Future Methods Internal Future Methods
^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^
@ -315,11 +252,12 @@ Module Functions
``wait(fs, timeout=None, return_when=ALL_COMPLETED)`` ``wait(fs, timeout=None, return_when=ALL_COMPLETED)``
Wait for the `Future` instances given by *fs* to complete. Wait for the `Future` instances (possibly created by different
Returns a named 2-tuple of sets. The first set, named "finished", `Executor` instances) given by *fs* to complete. Returns a named
contains the futures that completed (finished or were cancelled) 2-tuple of sets. The first set, named "done", contains the
before the wait completed. The second set, named "not_finished", futures that completed (finished or were cancelled) before the
contains uncompleted futures. wait completed. The second set, named "not_done", contains
uncompleted futures.
*timeout* can be used to control the maximum number of seconds to *timeout* can be used to control the maximum number of seconds to
wait before returning. If timeout is not specified or None then wait before returning. If timeout is not specified or None then
@ -350,6 +288,79 @@ Module Functions
*timeout* is not specified or `None` then there is no limit to the *timeout* is not specified or `None` then there is no limit to the
wait time. wait time.
The `Future` instances can have been created by different
`Executor` instances.
Check Prime Example
-------------------
::
from concurrent import futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime,
PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Web Crawl Example
-----------------
::
from concurrent import futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
return urllib.request.urlopen(url, timeout=timeout).read()
def main():
with futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = dict(
(executor.submit(load_url, url, 60), url)
for url in URLS)
for future in futures.as_completed(future_to_url):
url = future_to_url[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (
url, future.exception()))
else:
print('%r page is %d bytes' % (
url, len(future.result())))
if __name__ == '__main__':
main()
========= =========
Rationale Rationale
========= =========