Make PrioritizedExecutorService optionally FIFO

This commit is contained in:
Charles Allen 2015-10-07 10:10:57 -07:00
parent 092b5b19d2
commit ecdafa87c5
9 changed files with 418 additions and 91 deletions

View File

@ -58,6 +58,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally,
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
#### General Query Configuration

View File

@ -56,6 +56,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
#### General Query Configuration

View File

@ -42,4 +42,10 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem
{
return 0;
}
@Config(value = "${base_path}.fifo")
public boolean isFifo()
{
return false;
}
}

View File

@ -21,5 +21,5 @@ import java.util.concurrent.Callable;
public interface PrioritizedCallable<V> extends Callable<V>
{
public int getPriority();
int getPriority();
}

View File

@ -18,16 +18,16 @@
package io.druid.query;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.lifecycle.Lifecycle;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
@ -39,10 +39,11 @@ import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
public class PrioritizedExecutorService extends AbstractExecutorService implements ListeningExecutorService
{
public static PrioritizedExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config)
public static PrioritizedExecutorService create(Lifecycle lifecycle, DruidProcessingConfig config)
{
final PrioritizedExecutorService service = new PrioritizedExecutorService(
new ThreadPoolExecutor(
@ -52,7 +53,8 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen
TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(config.getFormatString()).build()
)
),
config
);
lifecycle.addHandler(
@ -74,28 +76,35 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen
return service;
}
private final AtomicLong queuePosition = new AtomicLong(Long.MAX_VALUE);
private final ListeningExecutorService delegate;
private final BlockingQueue<Runnable> delegateQueue;
private final boolean allowRegularTasks;
private final int defaultPriority;
private final DruidProcessingConfig config;
final ThreadPoolExecutor threadPoolExecutor; // Used in unit tests
public PrioritizedExecutorService(
ThreadPoolExecutor threadPoolExecutor
ThreadPoolExecutor threadPoolExecutor,
DruidProcessingConfig config
)
{
this(threadPoolExecutor, false, 0);
this(threadPoolExecutor, false, 0, config);
}
public PrioritizedExecutorService(
ThreadPoolExecutor threadPoolExecutor,
boolean allowRegularTasks,
int defaultPriority
int defaultPriority,
DruidProcessingConfig config
)
{
this.threadPoolExecutor = threadPoolExecutor;
this.delegate = MoreExecutors.listeningDecorator(Preconditions.checkNotNull(threadPoolExecutor));
this.delegateQueue = threadPoolExecutor.getQueue();
this.allowRegularTasks = allowRegularTasks;
this.defaultPriority = defaultPriority;
this.config = config;
}
@Override
@ -109,7 +118,8 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen
ListenableFutureTask.create(runnable, value),
runnable instanceof PrioritizedRunnable
? ((PrioritizedRunnable) runnable).getPriority()
: defaultPriority
: defaultPriority,
config.isFifo() ? queuePosition.decrementAndGet() : 0
);
}
@ -121,9 +131,11 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen
"task does not implement PrioritizedCallable"
);
return PrioritizedListenableFutureTask.create(
ListenableFutureTask.create(callable), callable instanceof PrioritizedCallable
? ((PrioritizedCallable) callable).getPriority()
: defaultPriority
ListenableFutureTask.create(callable),
callable instanceof PrioritizedCallable
? ((PrioritizedCallable) callable).getPriority()
: defaultPriority,
config.isFifo() ? queuePosition.decrementAndGet() : 0
);
}
@ -185,89 +197,128 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen
{
return delegateQueue.size();
}
}
public static class PrioritizedListenableFutureTask<V> implements RunnableFuture<V>,
ListenableFuture<V>,
PrioritizedRunnable,
Comparable<PrioritizedListenableFutureTask>
class PrioritizedListenableFutureTask<V> implements RunnableFuture<V>,
ListenableFuture<V>,
PrioritizedRunnable,
Comparable<PrioritizedListenableFutureTask>
{
// NOTE: For priority HIGHER numeric value means more priority. As such we swap left and right in the compares
private static final Comparator<PrioritizedListenableFutureTask> PRIORITY_COMPARATOR = new Ordering<PrioritizedListenableFutureTask>()
{
public static <V> PrioritizedListenableFutureTask<V> create(PrioritizedRunnable task, @Nullable V result)
{
return new PrioritizedListenableFutureTask<>(ListenableFutureTask.create(task, result), task.getPriority());
}
public static <V> PrioritizedListenableFutureTask<?> create(PrioritizedCallable<V> callable)
{
return new PrioritizedListenableFutureTask<>(ListenableFutureTask.create(callable), callable.getPriority());
}
public static <V> PrioritizedListenableFutureTask<V> create(ListenableFutureTask<V> task, int priority)
{
return new PrioritizedListenableFutureTask<>(task, priority);
}
private final ListenableFutureTask<V> delegate;
private final int priority;
PrioritizedListenableFutureTask(ListenableFutureTask<V> delegate, int priority)
{
this.delegate = delegate;
this.priority = priority;
}
@Override
public void run()
public int compare(
PrioritizedListenableFutureTask left, PrioritizedListenableFutureTask right
)
{
delegate.run();
return Integer.compare(right.getPriority(), left.getPriority());
}
}.compound(
new Ordering<PrioritizedListenableFutureTask>()
{
@Override
public int compare(PrioritizedListenableFutureTask left, PrioritizedListenableFutureTask right)
{
return Long.compare(right.getInsertionPlace(), left.getInsertionPlace());
}
}
);
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
return delegate.cancel(mayInterruptIfRunning);
}
public static <V> PrioritizedListenableFutureTask<V> create(
PrioritizedRunnable task,
@Nullable V result,
long position
)
{
return new PrioritizedListenableFutureTask<>(
ListenableFutureTask.create(task, result),
task.getPriority(),
position
);
}
@Override
public boolean isCancelled()
{
return delegate.isCancelled();
}
public static <V> PrioritizedListenableFutureTask<?> create(PrioritizedCallable<V> callable, long position)
{
return new PrioritizedListenableFutureTask<>(
ListenableFutureTask.create(callable),
callable.getPriority(),
position
);
}
@Override
public boolean isDone()
{
return delegate.isDone();
}
public static <V> PrioritizedListenableFutureTask<V> create(ListenableFutureTask<V> task, int priority, long position)
{
return new PrioritizedListenableFutureTask<>(task, priority, position);
}
@Override
public V get() throws InterruptedException, ExecutionException
{
return delegate.get();
}
private final ListenableFutureTask<V> delegate;
private final int priority;
private final long insertionPlace;
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
return delegate.get(timeout, unit);
}
PrioritizedListenableFutureTask(ListenableFutureTask<V> delegate, int priority, long position)
{
this.delegate = delegate;
this.priority = priority;
this.insertionPlace = position; // Long.MAX_VALUE will always be "highest"
}
@Override
public void addListener(Runnable listener, Executor executor)
{
delegate.addListener(listener, executor);
}
@Override
public void run()
{
delegate.run();
}
@Override
public int getPriority()
{
return priority;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
return delegate.cancel(mayInterruptIfRunning);
}
@Override
public int compareTo(PrioritizedListenableFutureTask otherTask)
{
return Ints.compare(otherTask.getPriority(), getPriority());
}
@Override
public boolean isCancelled()
{
return delegate.isCancelled();
}
@Override
public boolean isDone()
{
return delegate.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException
{
return delegate.get();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
return delegate.get(timeout, unit);
}
@Override
public void addListener(Runnable listener, Executor executor)
{
delegate.addListener(listener, executor);
}
@Override
public int getPriority()
{
return priority;
}
protected long getInsertionPlace()
{
return insertionPlace;
}
@Override
public int compareTo(PrioritizedListenableFutureTask otherTask)
{
return PRIORITY_COMPARATOR.compare(this, otherTask);
}
}

View File

@ -19,5 +19,5 @@ package io.druid.query;
public interface PrioritizedRunnable extends Runnable
{
public int getPriority();
int getPriority();
}

View File

@ -62,7 +62,7 @@ public class ChainedExecutionQueryRunnerTest
public void testQueryCancellation() throws Exception
{
ExecutorService exec = PrioritizedExecutorService.create(
new Lifecycle(), new ExecutorServiceConfig()
new Lifecycle(), new DruidProcessingConfig()
{
@Override
public String getFormatString()
@ -189,7 +189,7 @@ public class ChainedExecutionQueryRunnerTest
public void testQueryTimeout() throws Exception
{
ExecutorService exec = PrioritizedExecutorService.create(
new Lifecycle(), new ExecutorServiceConfig()
new Lifecycle(), new DruidProcessingConfig()
{
@Override
public String getFormatString()

View File

@ -17,32 +17,70 @@
package io.druid.query;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.lifecycle.Lifecycle;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.internal.AssumptionViolatedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
@RunWith(Parameterized.class)
public class PrioritizedExecutorServiceTest
{
private ExecutorService exec;
private PrioritizedExecutorService exec;
private CountDownLatch latch;
private CountDownLatch finishLatch;
private final boolean useFifo;
private final DruidProcessingConfig config;
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(new Object[]{true}, new Object[]{false});
}
public PrioritizedExecutorServiceTest(final boolean useFifo)
{
this.useFifo = useFifo;
this.config = new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
@Override
public boolean isFifo()
{
return useFifo;
}
};
}
@Before
public void setUp() throws Exception
{
exec = PrioritizedExecutorService.create(
new Lifecycle(),
new ExecutorServiceConfig()
new DruidProcessingConfig()
{
@Override
public String getFormatString()
@ -55,6 +93,12 @@ public class PrioritizedExecutorServiceTest
{
return 1;
}
@Override
public boolean isFifo()
{
return useFifo;
}
}
);
@ -62,6 +106,12 @@ public class PrioritizedExecutorServiceTest
finishLatch = new CountDownLatch(3);
}
@After
public void tearDown()
{
exec.shutdownNow();
}
/**
* Submits a normal priority task to block the queue, followed by low, high, normal priority tasks.
* Tests to see that the high priority task is executed first, followed by the normal and low priority tasks.
@ -130,4 +180,222 @@ public class PrioritizedExecutorServiceTest
List<Integer> expected = ImmutableList.of(2, 0, -1);
Assert.assertEquals(expected, ImmutableList.copyOf(order));
}
// Make sure entries are processed FIFO
@Test
public void testOrderedExecutionEqualPriorityRunnable() throws ExecutionException, InterruptedException
{
final int numTasks = 100;
final List<ListenableFuture<?>> futures = Lists.newArrayListWithExpectedSize(numTasks);
final AtomicInteger hasRun = new AtomicInteger(0);
for (int i = 0; i < numTasks; ++i) {
futures.add(exec.submit(getCheckingPrioritizedRunnable(i, hasRun)));
}
latch.countDown();
checkFutures(futures);
}
@Test
public void testOrderedExecutionEqualPriorityCallable() throws ExecutionException, InterruptedException
{
final int numTasks = 1_000;
final List<ListenableFuture<?>> futures = Lists.newArrayListWithExpectedSize(numTasks);
final AtomicInteger hasRun = new AtomicInteger(0);
for (int i = 0; i < numTasks; ++i) {
futures.add(exec.submit(getCheckingPrioritizedCallable(i, hasRun)));
}
latch.countDown();
checkFutures(futures);
}
@Test
public void testOrderedExecutionEqualPriorityMix() throws ExecutionException, InterruptedException
{
exec = new PrioritizedExecutorService(
exec.threadPoolExecutor, true, 0, config
);
final int numTasks = 1_000;
final List<ListenableFuture<?>> futures = Lists.newArrayListWithExpectedSize(numTasks);
final AtomicInteger hasRun = new AtomicInteger(0);
final Random random = new Random(789401);
for (int i = 0; i < numTasks; ++i) {
switch (random.nextInt(4)) {
case 0:
futures.add(exec.submit(getCheckingPrioritizedCallable(i, hasRun)));
break;
case 1:
futures.add(exec.submit(getCheckingPrioritizedRunnable(i, hasRun)));
break;
case 2:
futures.add(exec.submit(getCheckingCallable(i, hasRun)));
break;
case 3:
futures.add(exec.submit(getCheckingRunnable(i, hasRun)));
break;
default:
Assert.fail("Bad random result");
}
}
latch.countDown();
checkFutures(futures);
}
@Test
public void testOrderedExecutionMultiplePriorityMix() throws ExecutionException, InterruptedException
{
final int DEFAULT = 0;
final int MIN = -1;
final int MAX = 1;
exec = new PrioritizedExecutorService(exec.threadPoolExecutor, true, DEFAULT, config);
final int numTasks = 999;
final int[] priorities = new int[]{MAX, DEFAULT, MIN};
final int tasksPerPriority = numTasks / priorities.length;
final int[] priorityOffsets = new int[]{0, tasksPerPriority, tasksPerPriority * 2};
final List<ListenableFuture<?>> futures = Lists.newArrayListWithExpectedSize(numTasks);
final AtomicInteger hasRun = new AtomicInteger(0);
final Random random = new Random(789401);
for (int i = 0; i < numTasks; ++i) {
final int priorityBucket = i % priorities.length;
final int myPriority = priorities[priorityBucket];
final int priorityOffset = priorityOffsets[priorityBucket];
final int expectedPriorityOrder = i / priorities.length;
if (random.nextBoolean()) {
futures.add(
exec.submit(
getCheckingPrioritizedCallable(
priorityOffset + expectedPriorityOrder,
hasRun,
myPriority
)
)
);
} else {
futures.add(
exec.submit(
getCheckingPrioritizedRunnable(
priorityOffset + expectedPriorityOrder,
hasRun,
myPriority
)
)
);
}
}
latch.countDown();
checkFutures(futures);
}
private void checkFutures(Iterable<ListenableFuture<?>> futures) throws InterruptedException, ExecutionException
{
for (ListenableFuture<?> future : futures) {
try {
future.get();
}
catch (ExecutionException e) {
if (!(e.getCause() instanceof AssumptionViolatedException)) {
throw e;
}
}
}
}
private PrioritizedCallable<Boolean> getCheckingPrioritizedCallable(
final int myOrder,
final AtomicInteger hasRun
)
{
return getCheckingPrioritizedCallable(myOrder, hasRun, 0);
}
private PrioritizedCallable<Boolean> getCheckingPrioritizedCallable(
final int myOrder,
final AtomicInteger hasRun,
final int priority
)
{
final Callable<Boolean> delegate = getCheckingCallable(myOrder, hasRun);
return new AbstractPrioritizedCallable<Boolean>(priority)
{
@Override
public Boolean call() throws Exception
{
return delegate.call();
}
};
}
private Callable<Boolean> getCheckingCallable(
final int myOrder,
final AtomicInteger hasRun
)
{
final Runnable runnable = getCheckingRunnable(myOrder, hasRun);
return new Callable<Boolean>()
{
@Override
public Boolean call()
{
runnable.run();
return true;
}
};
}
private Runnable getCheckingRunnable(
final int myOrder,
final AtomicInteger hasRun
)
{
return new Runnable()
{
@Override
public void run()
{
try {
latch.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
if (useFifo) {
Assert.assertEquals(myOrder, hasRun.getAndIncrement());
} else {
Assume.assumeTrue(Integer.compare(myOrder, hasRun.getAndIncrement()) == 0);
}
}
};
}
private PrioritizedRunnable getCheckingPrioritizedRunnable(
final int myOrder,
final AtomicInteger hasRun
)
{
return getCheckingPrioritizedRunnable(myOrder, hasRun, 0);
}
private PrioritizedRunnable getCheckingPrioritizedRunnable(
final int myOrder,
final AtomicInteger hasRun,
final int priority
)
{
final Runnable delegate = getCheckingRunnable(myOrder, hasRun);
return new PrioritizedRunnable()
{
@Override
public int getPriority()
{
return priority;
}
@Override
public void run()
{
delegate.run();
}
};
}
}

View File

@ -82,7 +82,7 @@ public class DruidProcessingModule implements Module
@Processing
@ManageLifecycle
public ExecutorService getProcessingExecutorService(
ExecutorServiceConfig config,
DruidProcessingConfig config,
ServiceEmitter emitter,
Lifecycle lifecycle
)