Merge pull request #531 from metamx/listenable-prioritized-executorservice

make prioritized executor service listenable
This commit is contained in:
fjy 2014-05-16 16:59:04 -06:00
commit 77289a4148
9 changed files with 225 additions and 204 deletions

View File

@ -0,0 +1,36 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
public abstract class AbstractPrioritizedCallable<V> implements PrioritizedCallable<V>
{
private final int priority;
public AbstractPrioritizedCallable(int priority)
{
this.priority = priority;
}
@Override
public int getPriority()
{
return priority;
}
}

View File

@ -103,7 +103,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public Future<List<T>> apply(final QueryRunner<T> input) public Future<List<T>> apply(final QueryRunner<T> input)
{ {
return exec.submit( return exec.submit(
new PrioritizedCallable<List<T>>(priority) new AbstractPrioritizedCallable<List<T>>(priority)
{ {
@Override @Override
public List<T> call() throws Exception public List<T> call() throws Exception

View File

@ -1,127 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
*/
public class DelegatingExecutorService implements ExecutorService
{
private final ExecutorService delegate;
public DelegatingExecutorService(ExecutorService delegate)
{
this.delegate = delegate;
}
@Override
public void shutdown()
{
delegate.shutdown();
}
@Override
public List<Runnable> shutdownNow()
{
return delegate.shutdownNow();
}
@Override
public boolean isShutdown()
{
return delegate.isShutdown();
}
@Override
public boolean isTerminated()
{
return delegate.isTerminated();
}
@Override
public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException
{
return delegate.awaitTermination(l, timeUnit);
}
@Override
public <T> Future<T> submit(Callable<T> tCallable)
{
return delegate.submit(tCallable);
}
@Override
public <T> Future<T> submit(Runnable runnable, T t)
{
return delegate.submit(runnable, t);
}
@Override
public Future<?> submit(Runnable runnable)
{
return delegate.submit(runnable);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> callables) throws InterruptedException
{
return delegate.invokeAll(callables);
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> callables,
long l,
TimeUnit timeUnit
) throws InterruptedException
{
return delegate.invokeAll(callables, l, timeUnit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> callables) throws InterruptedException, ExecutionException
{
return delegate.invokeAny(callables);
}
@Override
public <T> T invokeAny(
Collection<? extends Callable<T>> callables,
long l,
TimeUnit timeUnit
) throws InterruptedException, ExecutionException, TimeoutException
{
return delegate.invokeAny(callables, l, timeUnit);
}
@Override
public void execute(Runnable runnable)
{
delegate.execute(runnable);
}
}

View File

@ -97,7 +97,7 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
public Future<Boolean> apply(final QueryRunner<Row> input) public Future<Boolean> apply(final QueryRunner<Row> input)
{ {
return exec.submit( return exec.submit(
new PrioritizedCallable<Boolean>(priority) new AbstractPrioritizedCallable<Boolean>(priority)
{ {
@Override @Override
public Boolean call() throws Exception public Boolean call() throws Exception

View File

@ -19,51 +19,57 @@
package io.druid.query; package io.druid.query;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
public class MetricsEmittingExecutorService extends DelegatingExecutorService public class MetricsEmittingExecutorService extends ForwardingListeningExecutorService
{ {
private final ExecutorService base; private final ListeningExecutorService delegate;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final ServiceMetricEvent.Builder metricBuilder; private final ServiceMetricEvent.Builder metricBuilder;
public MetricsEmittingExecutorService( public MetricsEmittingExecutorService(
ExecutorService base, ListeningExecutorService delegate,
ServiceEmitter emitter, ServiceEmitter emitter,
ServiceMetricEvent.Builder metricBuilder ServiceMetricEvent.Builder metricBuilder
) )
{ {
super(base); super();
this.base = base; this.delegate = delegate;
this.emitter = emitter; this.emitter = emitter;
this.metricBuilder = metricBuilder; this.metricBuilder = metricBuilder;
} }
@Override @Override
public <T> Future<T> submit(Callable<T> tCallable) protected ListeningExecutorService delegate()
{
return delegate;
}
@Override
public <T> ListenableFuture<T> submit(Callable<T> tCallable)
{ {
emitMetrics(); emitMetrics();
return base.submit(tCallable); return delegate.submit(tCallable);
} }
@Override @Override
public void execute(Runnable runnable) public void execute(Runnable runnable)
{ {
emitMetrics(); emitMetrics();
base.execute(runnable); delegate.execute(runnable);
} }
private void emitMetrics() private void emitMetrics()
{ {
if (base instanceof PrioritizedExecutorService) { if (delegate instanceof PrioritizedExecutorService) {
emitter.emit(metricBuilder.build("exec/backlog", ((PrioritizedExecutorService) base).getQueueSize())); emitter.emit(metricBuilder.build("exec/backlog", ((PrioritizedExecutorService) delegate).getQueueSize()));
} }
} }
} }

View File

@ -21,19 +21,7 @@ package io.druid.query;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
/** public interface PrioritizedCallable<V> extends Callable<V>
*/
public abstract class PrioritizedCallable<T> implements Callable<T>
{ {
final int priority; public int getPriority();
public PrioritizedCallable(int priority)
{
this.priority = priority;
}
public int getPriority()
{
return priority;
}
} }

View File

@ -19,23 +19,30 @@
package io.druid.query; package io.druid.query;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.concurrent.ExecutorServiceConfig; import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import javax.annotation.Nullable;
import java.util.List; import java.util.List;
import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture; import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** public class PrioritizedExecutorService extends AbstractExecutorService implements ListeningExecutorService
*/
public class PrioritizedExecutorService extends AbstractExecutorService
{ {
public static PrioritizedExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config) public static PrioritizedExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config)
{ {
@ -69,95 +76,180 @@ public class PrioritizedExecutorService extends AbstractExecutorService
return service; return service;
} }
private static final int DEFAULT_PRIORITY = 0; private final ListeningExecutorService delegate;
private final BlockingQueue<Runnable> delegateQueue;
private final boolean allowRegularTasks;
private final ThreadPoolExecutor threadPoolExecutor; private final int defaultPriority;
public PrioritizedExecutorService( public PrioritizedExecutorService(
ThreadPoolExecutor threadPoolExecutor ThreadPoolExecutor threadPoolExecutor
) )
{ {
this.threadPoolExecutor = threadPoolExecutor; this(threadPoolExecutor, false, 0);
}
public PrioritizedExecutorService(
ThreadPoolExecutor threadPoolExecutor,
boolean allowRegularTasks,
int defaultPriority
)
{
this.delegate = MoreExecutors.listeningDecorator(Preconditions.checkNotNull(threadPoolExecutor));
this.delegateQueue = threadPoolExecutor.getQueue();
this.allowRegularTasks = allowRegularTasks;
this.defaultPriority = defaultPriority;
}
@Override protected <T> PrioritizedListenableFutureTask<T> newTaskFor(Runnable runnable, T value) {
Preconditions.checkArgument(allowRegularTasks || runnable instanceof PrioritizedRunnable, "task does not implement PrioritizedRunnable");
return PrioritizedListenableFutureTask.create(ListenableFutureTask.create(runnable, value),
runnable instanceof PrioritizedRunnable
? ((PrioritizedRunnable) runnable).getPriority()
: defaultPriority
);
}
@Override protected <T> PrioritizedListenableFutureTask<T> newTaskFor(Callable<T> callable) {
Preconditions.checkArgument(allowRegularTasks || callable instanceof PrioritizedCallable, "task does not implement PrioritizedCallable");
return PrioritizedListenableFutureTask.create(
ListenableFutureTask.create(callable), callable instanceof PrioritizedCallable
? ((PrioritizedCallable) callable).getPriority()
: defaultPriority
);
}
@Override public ListenableFuture<?> submit(Runnable task) {
return (ListenableFuture<?>) super.submit(task);
}
@Override public <T> ListenableFuture<T> submit(Runnable task, @Nullable T result) {
return (ListenableFuture<T>) super.submit(task, result);
}
@Override public <T> ListenableFuture<T> submit(Callable<T> task) {
return (ListenableFuture<T>) super.submit(task);
} }
@Override @Override
public void shutdown() public void shutdown()
{ {
threadPoolExecutor.shutdown(); delegate.shutdown();
} }
@Override @Override
public List<Runnable> shutdownNow() public List<Runnable> shutdownNow()
{ {
return threadPoolExecutor.shutdownNow(); return delegate.shutdownNow();
} }
@Override @Override
public boolean isShutdown() public boolean isShutdown()
{ {
return threadPoolExecutor.isShutdown(); return delegate.isShutdown();
} }
@Override @Override
public boolean isTerminated() public boolean isTerminated()
{ {
return threadPoolExecutor.isTerminated(); return delegate.isTerminated();
} }
@Override @Override
public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException
{ {
return threadPoolExecutor.awaitTermination(l, timeUnit); return delegate.awaitTermination(l, timeUnit);
} }
@Override @Override
public void execute(Runnable runnable) public void execute(final Runnable runnable)
{ {
threadPoolExecutor.execute(runnable); delegate.execute(runnable);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(final Callable<T> tCallable)
{
Callable<T> theCallable = tCallable;
if (!(tCallable instanceof PrioritizedCallable)) {
theCallable = new PrioritizedCallable<T>(DEFAULT_PRIORITY)
{
@Override
public T call() throws Exception
{
return tCallable.call();
}
};
}
return new PrioritizedFuture<T>((PrioritizedCallable) theCallable);
} }
public int getQueueSize() public int getQueueSize()
{ {
return threadPoolExecutor.getQueue().size(); return delegateQueue.size();
} }
private static class PrioritizedFuture<V> extends FutureTask<V> implements Comparable<PrioritizedFuture>
{
private final PrioritizedCallable<V> callable;
public PrioritizedFuture(PrioritizedCallable<V> callable) public static class PrioritizedListenableFutureTask<V> implements RunnableFuture<V>, ListenableFuture<V>, PrioritizedRunnable, Comparable<PrioritizedListenableFutureTask>
{
public static <V> PrioritizedListenableFutureTask<V> create(PrioritizedRunnable task, @Nullable V result)
{ {
super(callable); return new PrioritizedListenableFutureTask<>(ListenableFutureTask.create(task, result), task.getPriority());
this.callable = callable;
} }
public int getPriority() public static <V> PrioritizedListenableFutureTask<?> create(PrioritizedCallable<V> callable)
{ {
return callable.getPriority(); return new PrioritizedListenableFutureTask<>(ListenableFutureTask.create(callable), callable.getPriority());
}
public static <V> PrioritizedListenableFutureTask<V> create(ListenableFutureTask<V> task, int priority)
{
return new PrioritizedListenableFutureTask<>(task, priority);
}
private final ListenableFutureTask<V> delegate;
private final int priority;
PrioritizedListenableFutureTask(ListenableFutureTask<V> delegate, int priority)
{
this.delegate = delegate;
this.priority = priority;
} }
@Override @Override
public int compareTo(PrioritizedFuture future) public void run()
{ {
return -Ints.compare(getPriority(), future.getPriority()); delegate.run();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
return delegate.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled()
{
return delegate.isCancelled();
}
@Override
public boolean isDone()
{
return delegate.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException
{
return delegate.get();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
return delegate.get(timeout, unit);
}
@Override
public void addListener(Runnable listener, Executor executor)
{
delegate.addListener(listener, executor);
}
@Override
public int getPriority()
{
return priority;
}
@Override
public int compareTo(PrioritizedListenableFutureTask otherTask)
{
return -Ints.compare(getPriority(), otherTask.getPriority());
} }
} }
} }

View File

@ -0,0 +1,25 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
public interface PrioritizedRunnable extends Runnable
{
public int getPriority();
}

View File

@ -27,6 +27,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -76,7 +77,7 @@ public class PrioritizedExecutorServiceTest
final ConcurrentLinkedQueue<Integer> order = new ConcurrentLinkedQueue<Integer>(); final ConcurrentLinkedQueue<Integer> order = new ConcurrentLinkedQueue<Integer>();
exec.submit( exec.submit(
new PrioritizedCallable<Void>(0) new AbstractPrioritizedCallable<Void>(0)
{ {
@Override @Override
public Void call() throws Exception public Void call() throws Exception
@ -88,7 +89,7 @@ public class PrioritizedExecutorServiceTest
); );
exec.submit( exec.submit(
new PrioritizedCallable<Void>(-1) new AbstractPrioritizedCallable<Void>(-1)
{ {
@Override @Override
public Void call() throws Exception public Void call() throws Exception
@ -100,7 +101,7 @@ public class PrioritizedExecutorServiceTest
} }
); );
exec.submit( exec.submit(
new PrioritizedCallable<Void>(0) new AbstractPrioritizedCallable<Void>(0)
{ {
@Override @Override
public Void call() throws Exception public Void call() throws Exception
@ -112,7 +113,7 @@ public class PrioritizedExecutorServiceTest
} }
); );
exec.submit( exec.submit(
new PrioritizedCallable<Void>(2) new AbstractPrioritizedCallable<Void>(2)
{ {
@Override @Override
public Void call() throws Exception public Void call() throws Exception