From 57071d7ad4975f9fcae57d2f4355f8bb3b425c72 Mon Sep 17 00:00:00 2001 From: kimchy Date: Fri, 30 Apr 2010 03:05:11 +0300 Subject: [PATCH] relax the ThreadPool interface --- .../elasticsearch/threadpool/ThreadPool.java | 165 +++++++++++++++++- .../support/AbstractThreadPool.java | 35 +--- 2 files changed, 162 insertions(+), 38 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 32da99a2c03..df0cc688d89 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -21,24 +21,177 @@ package org.elasticsearch.threadpool; import org.elasticsearch.util.TimeValue; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ -public interface ThreadPool extends ScheduledExecutorService { +public interface ThreadPool { boolean isStarted(); + /** + * Attempts to stop all actively executing tasks, halts the + * processing of waiting tasks, and returns a list of the tasks that were + * awaiting execution. + * + *

There are no guarantees beyond best-effort attempts to stop + * processing actively executing tasks. For example, typical + * implementations will cancel via {@link Thread#interrupt}, so any + * task that fails to respond to interrupts may never terminate. + */ + void shutdownNow(); + + /** + * Initiates an orderly shutdown in which previously submitted + * tasks are executed, but no new tasks will be accepted. + * Invocation has no additional effect if already shut down. + */ + void shutdown(); + + boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; + + void execute(Runnable command); + + /** + * Submits a value-returning task for execution and returns a + * Future representing the pending results of the task. The + * Future's get method will return the task's result upon + * successful completion. + * + *

+ * If you would like to immediately block waiting + * for a task, you can use constructions of the form + * result = exec.submit(aCallable).get(); + * + *

Note: The {@link Executors} class includes a set of methods + * that can convert some other common closure-like objects, + * for example, {@link java.security.PrivilegedAction} to + * {@link Callable} form so they can be submitted. + * + * @param task the task to submit + * @return a Future representing pending completion of the task + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + * @throws NullPointerException if the task is null + */ + Future submit(Callable task); + + /** + * Submits a Runnable task for execution and returns a Future + * representing that task. The Future's get method will + * return the given result upon successful completion. + * + * @param task the task to submit + * @param result the result to return + * @return a Future representing pending completion of the task + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + * @throws NullPointerException if the task is null + */ + Future submit(Runnable task, T result); + + /** + * Submits a Runnable task for execution and returns a Future + * representing that task. The Future's get method will + * return null upon successful completion. + * + * @param task the task to submit + * @return a Future representing pending completion of the task + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + * @throws NullPointerException if the task is null + */ + Future submit(Runnable task); + Future submit(Callable task, FutureListener listener); Future submit(Runnable task, T result, FutureListener listener); Future submit(Runnable task, FutureListener listener); + /** + * Creates and executes a one-shot action that becomes enabled + * after the given delay. + * + * @param command the task to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @return a ScheduledFuture representing pending completion of + * the task and whose get() method will return + * null upon completion + * @throws java.util.concurrent.RejectedExecutionException + * if the task cannot be + * scheduled for execution + * @throws NullPointerException if command is null + */ + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit); + + /** + * Creates and executes a ScheduledFuture that becomes enabled after the + * given delay. + * + * @param callable the function to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @return a ScheduledFuture that can be used to extract result or cancel + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + * @throws NullPointerException if callable is null + */ + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit); + + /** + * Creates and executes a periodic action that becomes enabled first + * after the given initial delay, and subsequently with the given + * period; that is executions will commence after + * initialDelay then initialDelay+period, then + * initialDelay + 2 * period, and so on. + * If any execution of the task + * encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or + * termination of the executor. If any execution of this task + * takes longer than its period, then subsequent executions + * may start late, but will not concurrently execute. + * + * @param command the task to execute + * @param initialDelay the time to delay first execution + * @param period the period between successive executions + * @param unit the time unit of the initialDelay and period parameters + * @return a ScheduledFuture representing pending completion of + * the task, and whose get() method will throw an + * exception upon cancellation + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + * @throws NullPointerException if command is null + * @throws IllegalArgumentException if period less than or equal to zero + */ + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); + + /** + * Creates and executes a periodic action that becomes enabled first + * after the given initial delay, and subsequently with the + * given delay between the termination of one execution and the + * commencement of the next. If any execution of the task + * encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or + * termination of the executor. + * + * @param command the task to execute + * @param initialDelay the time to delay first execution + * @param delay the delay between the termination of one + * execution and the commencement of the next + * @param unit the time unit of the initialDelay and delay parameters + * @return a ScheduledFuture representing pending completion of + * the task, and whose get() method will throw an + * exception upon cancellation + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + * @throws NullPointerException if command is null + * @throws IllegalArgumentException if delay less than or equal to zero + */ + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); + public ScheduledFuture schedule(Runnable command, TimeValue delay); ScheduledFuture scheduleWithFixedDelay(Runnable command, TimeValue interval); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java index 1fb55ae6fbc..1dd5d678b23 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/support/AbstractThreadPool.java @@ -25,9 +25,6 @@ import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.component.AbstractComponent; import org.elasticsearch.util.settings.Settings; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; import java.util.concurrent.*; /** @@ -74,20 +71,10 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th scheduledExecutorService.shutdown(); } - @Override public List shutdownNow() { + @Override public void shutdownNow() { started = false; - List result = new ArrayList(); - result.addAll(executorService.shutdownNow()); - result.addAll(scheduledExecutorService.shutdownNow()); - return result; - } - - @Override public boolean isShutdown() { - return executorService.isShutdown() || scheduledExecutorService.isShutdown(); - } - - @Override public boolean isTerminated() { - return executorService.isTerminated() || scheduledExecutorService.isTerminated(); + executorService.shutdownNow(); + scheduledExecutorService.shutdownNow(); } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { @@ -128,22 +115,6 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th return scheduleWithFixedDelay(command, interval.millis(), interval.millis(), TimeUnit.MILLISECONDS); } - @Override public List> invokeAll(Collection> tasks) throws InterruptedException { - return executorService.invokeAll(tasks); - } - - @Override public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return executorService.invokeAll(tasks, timeout, unit); - } - - @Override public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return executorService.invokeAny(tasks); - } - - @Override public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return executorService.invokeAny(tasks, timeout, unit); - } - @Override public void execute(Runnable command) { executorService.execute(command); }