diff --git a/processing/src/main/java/io/druid/query/AbstractPrioritizedCallable.java b/processing/src/main/java/io/druid/query/AbstractPrioritizedCallable.java new file mode 100644 index 00000000000..e3371d1210f --- /dev/null +++ b/processing/src/main/java/io/druid/query/AbstractPrioritizedCallable.java @@ -0,0 +1,36 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +public abstract class AbstractPrioritizedCallable implements PrioritizedCallable +{ + private final int priority; + + public AbstractPrioritizedCallable(int priority) + { + this.priority = priority; + } + + @Override + public int getPriority() + { + return priority; + } +} diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 6d7d1ea25b5..31eac12702e 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -103,7 +103,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner public Future> apply(final QueryRunner input) { return exec.submit( - new PrioritizedCallable>(priority) + new AbstractPrioritizedCallable>(priority) { @Override public List call() throws Exception diff --git a/processing/src/main/java/io/druid/query/DelegatingExecutorService.java b/processing/src/main/java/io/druid/query/DelegatingExecutorService.java deleted file mode 100644 index 5a6be3f854c..00000000000 --- a/processing/src/main/java/io/druid/query/DelegatingExecutorService.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.query; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - */ -public class DelegatingExecutorService implements ExecutorService -{ - private final ExecutorService delegate; - - public DelegatingExecutorService(ExecutorService delegate) - { - this.delegate = delegate; - } - - @Override - public void shutdown() - { - delegate.shutdown(); - } - - @Override - public List shutdownNow() - { - return delegate.shutdownNow(); - } - - @Override - public boolean isShutdown() - { - return delegate.isShutdown(); - } - - @Override - public boolean isTerminated() - { - return delegate.isTerminated(); - } - - @Override - public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException - { - return delegate.awaitTermination(l, timeUnit); - } - - @Override - public Future submit(Callable tCallable) - { - return delegate.submit(tCallable); - } - - @Override - public Future submit(Runnable runnable, T t) - { - return delegate.submit(runnable, t); - } - - @Override - public Future submit(Runnable runnable) - { - return delegate.submit(runnable); - } - - @Override - public List> invokeAll(Collection> callables) throws InterruptedException - { - return delegate.invokeAll(callables); - } - - @Override - public List> invokeAll( - Collection> callables, - long l, - TimeUnit timeUnit - ) throws InterruptedException - { - return delegate.invokeAll(callables, l, timeUnit); - } - - @Override - public T invokeAny(Collection> callables) throws InterruptedException, ExecutionException - { - return delegate.invokeAny(callables); - } - - @Override - public T invokeAny( - Collection> callables, - long l, - TimeUnit timeUnit - ) throws InterruptedException, ExecutionException, TimeoutException - { - return delegate.invokeAny(callables, l, timeUnit); - } - - @Override - public void execute(Runnable runnable) - { - delegate.execute(runnable); - } -} diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 20817a772e5..51c663c6a2e 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -97,7 +97,7 @@ public class GroupByParallelQueryRunner implements QueryRunner public Future apply(final QueryRunner input) { return exec.submit( - new PrioritizedCallable(priority) + new AbstractPrioritizedCallable(priority) { @Override public Boolean call() throws Exception diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingExecutorService.java b/processing/src/main/java/io/druid/query/MetricsEmittingExecutorService.java index 37ed9e81e12..e581e77399d 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingExecutorService.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingExecutorService.java @@ -19,51 +19,57 @@ package io.druid.query; +import com.google.common.util.concurrent.ForwardingListeningExecutorService; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; -public class MetricsEmittingExecutorService extends DelegatingExecutorService +public class MetricsEmittingExecutorService extends ForwardingListeningExecutorService { - private final ExecutorService base; + private final ListeningExecutorService delegate; private final ServiceEmitter emitter; private final ServiceMetricEvent.Builder metricBuilder; public MetricsEmittingExecutorService( - ExecutorService base, + ListeningExecutorService delegate, ServiceEmitter emitter, ServiceMetricEvent.Builder metricBuilder ) { - super(base); + super(); - this.base = base; + this.delegate = delegate; this.emitter = emitter; this.metricBuilder = metricBuilder; } @Override - public Future submit(Callable tCallable) + protected ListeningExecutorService delegate() + { + return delegate; + } + + @Override + public ListenableFuture submit(Callable tCallable) { emitMetrics(); - return base.submit(tCallable); + return delegate.submit(tCallable); } @Override public void execute(Runnable runnable) { emitMetrics(); - base.execute(runnable); + delegate.execute(runnable); } private void emitMetrics() { - if (base instanceof PrioritizedExecutorService) { - emitter.emit(metricBuilder.build("exec/backlog", ((PrioritizedExecutorService) base).getQueueSize())); + if (delegate instanceof PrioritizedExecutorService) { + emitter.emit(metricBuilder.build("exec/backlog", ((PrioritizedExecutorService) delegate).getQueueSize())); } } } diff --git a/processing/src/main/java/io/druid/query/PrioritizedCallable.java b/processing/src/main/java/io/druid/query/PrioritizedCallable.java index e664a8d682b..fb61da9f70c 100644 --- a/processing/src/main/java/io/druid/query/PrioritizedCallable.java +++ b/processing/src/main/java/io/druid/query/PrioritizedCallable.java @@ -21,19 +21,7 @@ package io.druid.query; import java.util.concurrent.Callable; -/** - */ -public abstract class PrioritizedCallable implements Callable +public interface PrioritizedCallable extends Callable { - final int priority; - - public PrioritizedCallable(int priority) - { - this.priority = priority; - } - - public int getPriority() - { - return priority; - } + public int getPriority(); } diff --git a/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java b/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java index 95f465a4edf..c0dd98b6718 100644 --- a/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java +++ b/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java @@ -19,23 +19,30 @@ package io.druid.query; +import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.concurrent.ExecutorServiceConfig; import com.metamx.common.lifecycle.Lifecycle; +import javax.annotation.Nullable; import java.util.List; import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; -/** - */ -public class PrioritizedExecutorService extends AbstractExecutorService +public class PrioritizedExecutorService extends AbstractExecutorService implements ListeningExecutorService { public static PrioritizedExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config) { @@ -69,95 +76,180 @@ public class PrioritizedExecutorService extends AbstractExecutorService return service; } - private static final int DEFAULT_PRIORITY = 0; - - - private final ThreadPoolExecutor threadPoolExecutor; + private final ListeningExecutorService delegate; + private final BlockingQueue delegateQueue; + private final boolean allowRegularTasks; + private final int defaultPriority; public PrioritizedExecutorService( ThreadPoolExecutor threadPoolExecutor ) { - this.threadPoolExecutor = threadPoolExecutor; + this(threadPoolExecutor, false, 0); + } + + public PrioritizedExecutorService( + ThreadPoolExecutor threadPoolExecutor, + boolean allowRegularTasks, + int defaultPriority + ) + { + this.delegate = MoreExecutors.listeningDecorator(Preconditions.checkNotNull(threadPoolExecutor)); + this.delegateQueue = threadPoolExecutor.getQueue(); + this.allowRegularTasks = allowRegularTasks; + this.defaultPriority = defaultPriority; + } + + @Override protected PrioritizedListenableFutureTask newTaskFor(Runnable runnable, T value) { + Preconditions.checkArgument(allowRegularTasks || runnable instanceof PrioritizedRunnable, "task does not implement PrioritizedRunnable"); + return PrioritizedListenableFutureTask.create(ListenableFutureTask.create(runnable, value), + runnable instanceof PrioritizedRunnable + ? ((PrioritizedRunnable) runnable).getPriority() + : defaultPriority + ); + } + + @Override protected PrioritizedListenableFutureTask newTaskFor(Callable callable) { + Preconditions.checkArgument(allowRegularTasks || callable instanceof PrioritizedCallable, "task does not implement PrioritizedCallable"); + return PrioritizedListenableFutureTask.create( + ListenableFutureTask.create(callable), callable instanceof PrioritizedCallable + ? ((PrioritizedCallable) callable).getPriority() + : defaultPriority + ); + } + + @Override public ListenableFuture submit(Runnable task) { + return (ListenableFuture) super.submit(task); + } + + @Override public ListenableFuture submit(Runnable task, @Nullable T result) { + return (ListenableFuture) super.submit(task, result); + } + + @Override public ListenableFuture submit(Callable task) { + return (ListenableFuture) super.submit(task); } @Override public void shutdown() { - threadPoolExecutor.shutdown(); + delegate.shutdown(); } @Override public List shutdownNow() { - return threadPoolExecutor.shutdownNow(); + return delegate.shutdownNow(); } @Override public boolean isShutdown() { - return threadPoolExecutor.isShutdown(); + return delegate.isShutdown(); } @Override public boolean isTerminated() { - return threadPoolExecutor.isTerminated(); + return delegate.isTerminated(); } @Override public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException { - return threadPoolExecutor.awaitTermination(l, timeUnit); + return delegate.awaitTermination(l, timeUnit); } @Override - public void execute(Runnable runnable) + public void execute(final Runnable runnable) { - threadPoolExecutor.execute(runnable); - } - - @Override - protected RunnableFuture newTaskFor(final Callable tCallable) - { - Callable theCallable = tCallable; - if (!(tCallable instanceof PrioritizedCallable)) { - theCallable = new PrioritizedCallable(DEFAULT_PRIORITY) - { - @Override - public T call() throws Exception - { - return tCallable.call(); - } - }; - } - return new PrioritizedFuture((PrioritizedCallable) theCallable); + delegate.execute(runnable); } public int getQueueSize() { - return threadPoolExecutor.getQueue().size(); + return delegateQueue.size(); } - private static class PrioritizedFuture extends FutureTask implements Comparable - { - private final PrioritizedCallable callable; - public PrioritizedFuture(PrioritizedCallable callable) + public static class PrioritizedListenableFutureTask implements RunnableFuture, ListenableFuture, PrioritizedRunnable, Comparable + { + public static PrioritizedListenableFutureTask create(PrioritizedRunnable task, @Nullable V result) { - super(callable); - this.callable = callable; + return new PrioritizedListenableFutureTask<>(ListenableFutureTask.create(task, result), task.getPriority()); } - public int getPriority() + public static PrioritizedListenableFutureTask create(PrioritizedCallable callable) { - return callable.getPriority(); + return new PrioritizedListenableFutureTask<>(ListenableFutureTask.create(callable), callable.getPriority()); + } + + public static PrioritizedListenableFutureTask create(ListenableFutureTask task, int priority) + { + return new PrioritizedListenableFutureTask<>(task, priority); + } + + private final ListenableFutureTask delegate; + private final int priority; + + PrioritizedListenableFutureTask(ListenableFutureTask delegate, int priority) + { + this.delegate = delegate; + this.priority = priority; } @Override - public int compareTo(PrioritizedFuture future) + public void run() { - return -Ints.compare(getPriority(), future.getPriority()); + delegate.run(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + return delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() + { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() + { + return delegate.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException + { + return delegate.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + return delegate.get(timeout, unit); + } + + @Override + public void addListener(Runnable listener, Executor executor) + { + delegate.addListener(listener, executor); + } + + @Override + public int getPriority() + { + return priority; + } + + @Override + public int compareTo(PrioritizedListenableFutureTask otherTask) + { + return -Ints.compare(getPriority(), otherTask.getPriority()); } } } diff --git a/processing/src/main/java/io/druid/query/PrioritizedRunnable.java b/processing/src/main/java/io/druid/query/PrioritizedRunnable.java new file mode 100644 index 00000000000..6e277b20423 --- /dev/null +++ b/processing/src/main/java/io/druid/query/PrioritizedRunnable.java @@ -0,0 +1,25 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +public interface PrioritizedRunnable extends Runnable +{ + public int getPriority(); +} diff --git a/processing/src/test/java/io/druid/query/PrioritizedExecutorServiceTest.java b/processing/src/test/java/io/druid/query/PrioritizedExecutorServiceTest.java index 0209fafa5ec..1d6f8a2ae07 100644 --- a/processing/src/test/java/io/druid/query/PrioritizedExecutorServiceTest.java +++ b/processing/src/test/java/io/druid/query/PrioritizedExecutorServiceTest.java @@ -27,6 +27,7 @@ import org.junit.Before; import org.junit.Test; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -76,7 +77,7 @@ public class PrioritizedExecutorServiceTest final ConcurrentLinkedQueue order = new ConcurrentLinkedQueue(); exec.submit( - new PrioritizedCallable(0) + new AbstractPrioritizedCallable(0) { @Override public Void call() throws Exception @@ -88,7 +89,7 @@ public class PrioritizedExecutorServiceTest ); exec.submit( - new PrioritizedCallable(-1) + new AbstractPrioritizedCallable(-1) { @Override public Void call() throws Exception @@ -100,7 +101,7 @@ public class PrioritizedExecutorServiceTest } ); exec.submit( - new PrioritizedCallable(0) + new AbstractPrioritizedCallable(0) { @Override public Void call() throws Exception @@ -112,7 +113,7 @@ public class PrioritizedExecutorServiceTest } ); exec.submit( - new PrioritizedCallable(2) + new AbstractPrioritizedCallable(2) { @Override public Void call() throws Exception