diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 55bac52b882..2df800f16b7 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -58,6 +58,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally, |`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s| |`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| +|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`| #### General Query Configuration diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index edd524edab8..86c6330ffb7 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -56,6 +56,7 @@ Druid uses Jetty to serve HTTP requests. |`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s| |`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| +|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`| #### General Query Configuration diff --git a/processing/src/main/java/io/druid/query/DruidProcessingConfig.java b/processing/src/main/java/io/druid/query/DruidProcessingConfig.java index 365868bb90b..fcbc1ae174e 100644 --- a/processing/src/main/java/io/druid/query/DruidProcessingConfig.java +++ b/processing/src/main/java/io/druid/query/DruidProcessingConfig.java @@ -42,4 +42,10 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem { return 0; } + + @Config(value = "${base_path}.fifo") + public boolean isFifo() + { + return false; + } } diff --git a/processing/src/main/java/io/druid/query/PrioritizedCallable.java b/processing/src/main/java/io/druid/query/PrioritizedCallable.java index 3608584e12c..346ddadac84 100644 --- a/processing/src/main/java/io/druid/query/PrioritizedCallable.java +++ b/processing/src/main/java/io/druid/query/PrioritizedCallable.java @@ -21,5 +21,5 @@ import java.util.concurrent.Callable; public interface PrioritizedCallable extends Callable { - public int getPriority(); + int getPriority(); } diff --git a/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java b/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java index e2cb9ef7480..cc1b1eb91c8 100644 --- a/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java +++ b/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java @@ -18,16 +18,16 @@ package io.druid.query; import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; +import com.google.common.collect.Ordering; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFutureTask; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.metamx.common.concurrent.ExecutorServiceConfig; import com.metamx.common.lifecycle.Lifecycle; import javax.annotation.Nullable; +import java.util.Comparator; import java.util.List; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.BlockingQueue; @@ -39,10 +39,11 @@ import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; public class PrioritizedExecutorService extends AbstractExecutorService implements ListeningExecutorService { - public static PrioritizedExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config) + public static PrioritizedExecutorService create(Lifecycle lifecycle, DruidProcessingConfig config) { final PrioritizedExecutorService service = new PrioritizedExecutorService( new ThreadPoolExecutor( @@ -52,7 +53,8 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen TimeUnit.MILLISECONDS, new PriorityBlockingQueue(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(config.getFormatString()).build() - ) + ), + config ); lifecycle.addHandler( @@ -74,28 +76,35 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen return service; } + private final AtomicLong queuePosition = new AtomicLong(Long.MAX_VALUE); private final ListeningExecutorService delegate; private final BlockingQueue delegateQueue; private final boolean allowRegularTasks; private final int defaultPriority; + private final DruidProcessingConfig config; + final ThreadPoolExecutor threadPoolExecutor; // Used in unit tests public PrioritizedExecutorService( - ThreadPoolExecutor threadPoolExecutor + ThreadPoolExecutor threadPoolExecutor, + DruidProcessingConfig config ) { - this(threadPoolExecutor, false, 0); + this(threadPoolExecutor, false, 0, config); } public PrioritizedExecutorService( ThreadPoolExecutor threadPoolExecutor, boolean allowRegularTasks, - int defaultPriority + int defaultPriority, + DruidProcessingConfig config ) { + this.threadPoolExecutor = threadPoolExecutor; this.delegate = MoreExecutors.listeningDecorator(Preconditions.checkNotNull(threadPoolExecutor)); this.delegateQueue = threadPoolExecutor.getQueue(); this.allowRegularTasks = allowRegularTasks; this.defaultPriority = defaultPriority; + this.config = config; } @Override @@ -109,7 +118,8 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen ListenableFutureTask.create(runnable, value), runnable instanceof PrioritizedRunnable ? ((PrioritizedRunnable) runnable).getPriority() - : defaultPriority + : defaultPriority, + config.isFifo() ? queuePosition.decrementAndGet() : 0 ); } @@ -121,9 +131,11 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen "task does not implement PrioritizedCallable" ); return PrioritizedListenableFutureTask.create( - ListenableFutureTask.create(callable), callable instanceof PrioritizedCallable - ? ((PrioritizedCallable) callable).getPriority() - : defaultPriority + ListenableFutureTask.create(callable), + callable instanceof PrioritizedCallable + ? ((PrioritizedCallable) callable).getPriority() + : defaultPriority, + config.isFifo() ? queuePosition.decrementAndGet() : 0 ); } @@ -185,89 +197,128 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen { return delegateQueue.size(); } +} - - public static class PrioritizedListenableFutureTask implements RunnableFuture, - ListenableFuture, - PrioritizedRunnable, - Comparable +class PrioritizedListenableFutureTask implements RunnableFuture, + ListenableFuture, + PrioritizedRunnable, + Comparable +{ + // NOTE: For priority HIGHER numeric value means more priority. As such we swap left and right in the compares + private static final Comparator PRIORITY_COMPARATOR = new Ordering() { - public static PrioritizedListenableFutureTask create(PrioritizedRunnable task, @Nullable V result) - { - return new PrioritizedListenableFutureTask<>(ListenableFutureTask.create(task, result), task.getPriority()); - } - - public static PrioritizedListenableFutureTask create(PrioritizedCallable callable) - { - return new PrioritizedListenableFutureTask<>(ListenableFutureTask.create(callable), callable.getPriority()); - } - - public static PrioritizedListenableFutureTask create(ListenableFutureTask task, int priority) - { - return new PrioritizedListenableFutureTask<>(task, priority); - } - - private final ListenableFutureTask delegate; - private final int priority; - - PrioritizedListenableFutureTask(ListenableFutureTask delegate, int priority) - { - this.delegate = delegate; - this.priority = priority; - } - @Override - public void run() + public int compare( + PrioritizedListenableFutureTask left, PrioritizedListenableFutureTask right + ) { - delegate.run(); + return Integer.compare(right.getPriority(), left.getPriority()); } + }.compound( + new Ordering() + { + @Override + public int compare(PrioritizedListenableFutureTask left, PrioritizedListenableFutureTask right) + { + return Long.compare(right.getInsertionPlace(), left.getInsertionPlace()); + } + } + ); - @Override - public boolean cancel(boolean mayInterruptIfRunning) - { - return delegate.cancel(mayInterruptIfRunning); - } + public static PrioritizedListenableFutureTask create( + PrioritizedRunnable task, + @Nullable V result, + long position + ) + { + return new PrioritizedListenableFutureTask<>( + ListenableFutureTask.create(task, result), + task.getPriority(), + position + ); + } - @Override - public boolean isCancelled() - { - return delegate.isCancelled(); - } + public static PrioritizedListenableFutureTask create(PrioritizedCallable callable, long position) + { + return new PrioritizedListenableFutureTask<>( + ListenableFutureTask.create(callable), + callable.getPriority(), + position + ); + } - @Override - public boolean isDone() - { - return delegate.isDone(); - } + public static PrioritizedListenableFutureTask create(ListenableFutureTask task, int priority, long position) + { + return new PrioritizedListenableFutureTask<>(task, priority, position); + } - @Override - public V get() throws InterruptedException, ExecutionException - { - return delegate.get(); - } + private final ListenableFutureTask delegate; + private final int priority; + private final long insertionPlace; - @Override - public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException - { - return delegate.get(timeout, unit); - } + PrioritizedListenableFutureTask(ListenableFutureTask delegate, int priority, long position) + { + this.delegate = delegate; + this.priority = priority; + this.insertionPlace = position; // Long.MAX_VALUE will always be "highest" + } - @Override - public void addListener(Runnable listener, Executor executor) - { - delegate.addListener(listener, executor); - } + @Override + public void run() + { + delegate.run(); + } - @Override - public int getPriority() - { - return priority; - } + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + return delegate.cancel(mayInterruptIfRunning); + } - @Override - public int compareTo(PrioritizedListenableFutureTask otherTask) - { - return Ints.compare(otherTask.getPriority(), getPriority()); - } + @Override + public boolean isCancelled() + { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() + { + return delegate.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException + { + return delegate.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + return delegate.get(timeout, unit); + } + + @Override + public void addListener(Runnable listener, Executor executor) + { + delegate.addListener(listener, executor); + } + + @Override + public int getPriority() + { + return priority; + } + + protected long getInsertionPlace() + { + return insertionPlace; + } + + @Override + public int compareTo(PrioritizedListenableFutureTask otherTask) + { + return PRIORITY_COMPARATOR.compare(this, otherTask); } } diff --git a/processing/src/main/java/io/druid/query/PrioritizedRunnable.java b/processing/src/main/java/io/druid/query/PrioritizedRunnable.java index 88a1d403d5a..d414a039a30 100644 --- a/processing/src/main/java/io/druid/query/PrioritizedRunnable.java +++ b/processing/src/main/java/io/druid/query/PrioritizedRunnable.java @@ -19,5 +19,5 @@ package io.druid.query; public interface PrioritizedRunnable extends Runnable { - public int getPriority(); + int getPriority(); } diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index c40b5056207..dde97d45760 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -62,7 +62,7 @@ public class ChainedExecutionQueryRunnerTest public void testQueryCancellation() throws Exception { ExecutorService exec = PrioritizedExecutorService.create( - new Lifecycle(), new ExecutorServiceConfig() + new Lifecycle(), new DruidProcessingConfig() { @Override public String getFormatString() @@ -189,7 +189,7 @@ public class ChainedExecutionQueryRunnerTest public void testQueryTimeout() throws Exception { ExecutorService exec = PrioritizedExecutorService.create( - new Lifecycle(), new ExecutorServiceConfig() + new Lifecycle(), new DruidProcessingConfig() { @Override public String getFormatString() diff --git a/processing/src/test/java/io/druid/query/PrioritizedExecutorServiceTest.java b/processing/src/test/java/io/druid/query/PrioritizedExecutorServiceTest.java index 816454a789a..8529c7a9bb3 100644 --- a/processing/src/test/java/io/druid/query/PrioritizedExecutorServiceTest.java +++ b/processing/src/test/java/io/druid/query/PrioritizedExecutorServiceTest.java @@ -17,32 +17,70 @@ package io.druid.query; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.metamx.common.concurrent.ExecutorServiceConfig; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.lifecycle.Lifecycle; +import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; +import org.junit.internal.AssumptionViolatedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; /** */ +@RunWith(Parameterized.class) public class PrioritizedExecutorServiceTest { - private ExecutorService exec; + private PrioritizedExecutorService exec; private CountDownLatch latch; private CountDownLatch finishLatch; + private final boolean useFifo; + private final DruidProcessingConfig config; + + @Parameterized.Parameters(name = "{0}") + public static Iterable constructorFeeder() + { + return ImmutableList.of(new Object[]{true}, new Object[]{false}); + } + + public PrioritizedExecutorServiceTest(final boolean useFifo) + { + this.useFifo = useFifo; + this.config = new DruidProcessingConfig() + { + @Override + public String getFormatString() + { + return null; + } + + @Override + public boolean isFifo() + { + return useFifo; + } + }; + } @Before public void setUp() throws Exception { exec = PrioritizedExecutorService.create( new Lifecycle(), - new ExecutorServiceConfig() + new DruidProcessingConfig() { @Override public String getFormatString() @@ -55,6 +93,12 @@ public class PrioritizedExecutorServiceTest { return 1; } + + @Override + public boolean isFifo() + { + return useFifo; + } } ); @@ -62,6 +106,12 @@ public class PrioritizedExecutorServiceTest finishLatch = new CountDownLatch(3); } + @After + public void tearDown() + { + exec.shutdownNow(); + } + /** * Submits a normal priority task to block the queue, followed by low, high, normal priority tasks. * Tests to see that the high priority task is executed first, followed by the normal and low priority tasks. @@ -130,4 +180,222 @@ public class PrioritizedExecutorServiceTest List expected = ImmutableList.of(2, 0, -1); Assert.assertEquals(expected, ImmutableList.copyOf(order)); } + + // Make sure entries are processed FIFO + @Test + public void testOrderedExecutionEqualPriorityRunnable() throws ExecutionException, InterruptedException + { + final int numTasks = 100; + final List> futures = Lists.newArrayListWithExpectedSize(numTasks); + final AtomicInteger hasRun = new AtomicInteger(0); + for (int i = 0; i < numTasks; ++i) { + futures.add(exec.submit(getCheckingPrioritizedRunnable(i, hasRun))); + } + latch.countDown(); + checkFutures(futures); + } + + @Test + public void testOrderedExecutionEqualPriorityCallable() throws ExecutionException, InterruptedException + { + final int numTasks = 1_000; + final List> futures = Lists.newArrayListWithExpectedSize(numTasks); + final AtomicInteger hasRun = new AtomicInteger(0); + for (int i = 0; i < numTasks; ++i) { + futures.add(exec.submit(getCheckingPrioritizedCallable(i, hasRun))); + } + latch.countDown(); + checkFutures(futures); + } + + @Test + public void testOrderedExecutionEqualPriorityMix() throws ExecutionException, InterruptedException + { + exec = new PrioritizedExecutorService( + exec.threadPoolExecutor, true, 0, config + ); + final int numTasks = 1_000; + final List> futures = Lists.newArrayListWithExpectedSize(numTasks); + final AtomicInteger hasRun = new AtomicInteger(0); + final Random random = new Random(789401); + for (int i = 0; i < numTasks; ++i) { + switch (random.nextInt(4)) { + case 0: + futures.add(exec.submit(getCheckingPrioritizedCallable(i, hasRun))); + break; + case 1: + futures.add(exec.submit(getCheckingPrioritizedRunnable(i, hasRun))); + break; + case 2: + futures.add(exec.submit(getCheckingCallable(i, hasRun))); + break; + case 3: + futures.add(exec.submit(getCheckingRunnable(i, hasRun))); + break; + default: + Assert.fail("Bad random result"); + } + } + latch.countDown(); + checkFutures(futures); + } + + @Test + public void testOrderedExecutionMultiplePriorityMix() throws ExecutionException, InterruptedException + { + final int DEFAULT = 0; + final int MIN = -1; + final int MAX = 1; + exec = new PrioritizedExecutorService(exec.threadPoolExecutor, true, DEFAULT, config); + final int numTasks = 999; + final int[] priorities = new int[]{MAX, DEFAULT, MIN}; + final int tasksPerPriority = numTasks / priorities.length; + final int[] priorityOffsets = new int[]{0, tasksPerPriority, tasksPerPriority * 2}; + final List> futures = Lists.newArrayListWithExpectedSize(numTasks); + final AtomicInteger hasRun = new AtomicInteger(0); + final Random random = new Random(789401); + for (int i = 0; i < numTasks; ++i) { + final int priorityBucket = i % priorities.length; + final int myPriority = priorities[priorityBucket]; + final int priorityOffset = priorityOffsets[priorityBucket]; + final int expectedPriorityOrder = i / priorities.length; + if (random.nextBoolean()) { + futures.add( + exec.submit( + getCheckingPrioritizedCallable( + priorityOffset + expectedPriorityOrder, + hasRun, + myPriority + ) + ) + ); + } else { + futures.add( + exec.submit( + getCheckingPrioritizedRunnable( + priorityOffset + expectedPriorityOrder, + hasRun, + myPriority + ) + ) + ); + } + } + latch.countDown(); + checkFutures(futures); + } + + private void checkFutures(Iterable> futures) throws InterruptedException, ExecutionException + { + for (ListenableFuture future : futures) { + try { + future.get(); + } + catch (ExecutionException e) { + if (!(e.getCause() instanceof AssumptionViolatedException)) { + throw e; + } + } + } + } + + private PrioritizedCallable getCheckingPrioritizedCallable( + final int myOrder, + final AtomicInteger hasRun + ) + { + return getCheckingPrioritizedCallable(myOrder, hasRun, 0); + } + + private PrioritizedCallable getCheckingPrioritizedCallable( + final int myOrder, + final AtomicInteger hasRun, + final int priority + ) + { + final Callable delegate = getCheckingCallable(myOrder, hasRun); + return new AbstractPrioritizedCallable(priority) + { + @Override + public Boolean call() throws Exception + { + return delegate.call(); + } + }; + } + + private Callable getCheckingCallable( + final int myOrder, + final AtomicInteger hasRun + ) + { + final Runnable runnable = getCheckingRunnable(myOrder, hasRun); + return new Callable() + { + @Override + public Boolean call() + { + runnable.run(); + return true; + } + }; + } + + private Runnable getCheckingRunnable( + final int myOrder, + final AtomicInteger hasRun + ) + { + return new Runnable() + { + @Override + public void run() + { + try { + latch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + if (useFifo) { + Assert.assertEquals(myOrder, hasRun.getAndIncrement()); + } else { + Assume.assumeTrue(Integer.compare(myOrder, hasRun.getAndIncrement()) == 0); + } + } + }; + } + + + private PrioritizedRunnable getCheckingPrioritizedRunnable( + final int myOrder, + final AtomicInteger hasRun + ) + { + return getCheckingPrioritizedRunnable(myOrder, hasRun, 0); + } + + private PrioritizedRunnable getCheckingPrioritizedRunnable( + final int myOrder, + final AtomicInteger hasRun, + final int priority + ) + { + final Runnable delegate = getCheckingRunnable(myOrder, hasRun); + return new PrioritizedRunnable() + { + @Override + public int getPriority() + { + return priority; + } + + @Override + public void run() + { + delegate.run(); + } + }; + } } diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java index 28bb3753eb9..cc78c61dfbc 100644 --- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java @@ -82,7 +82,7 @@ public class DruidProcessingModule implements Module @Processing @ManageLifecycle public ExecutorService getProcessingExecutorService( - ExecutorServiceConfig config, + DruidProcessingConfig config, ServiceEmitter emitter, Lifecycle lifecycle )