diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index 1996a420a33..72519cbeb8c 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -125,25 +125,21 @@ public class CachingClusteredClient implements QueryRunner final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) && strategy != null; final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); - final String priority = query.getContextValue("priority", Queries.Priority.NORMAL.name()); - final Query prioritizedQuery = query.withOverriddenContext(ImmutableMap.of("priority", priority)); - final Query rewrittenQuery; + ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder(); + + final String priority = query.getContextValue("priority", "0"); + contextBuilder.put("priority", priority); + if (populateCache) { - rewrittenQuery = prioritizedQuery.withOverriddenContext( - ImmutableMap.of( - "bySegment", - "true", - "intermediate", - "true" - ) - ); - } else { - rewrittenQuery = prioritizedQuery.withOverriddenContext(ImmutableMap.of("intermediate", "true")); + contextBuilder.put("bySegment", "true"); } + contextBuilder.put("intermediate", "true"); - VersionedIntervalTimeline timeline = serverView.getTimeline(prioritizedQuery.getDataSource()); + final Query rewrittenQuery = query.withOverriddenContext(contextBuilder.build()); + + VersionedIntervalTimeline timeline = serverView.getTimeline(query.getDataSource()); if (timeline == null) { return Sequences.empty(); } @@ -159,7 +155,7 @@ public class CachingClusteredClient implements QueryRunner // Let tool chest filter out unneeded segments final List> filteredServersLookup = - toolChest.filterSegments(prioritizedQuery, serversLookup); + toolChest.filterSegments(query, serversLookup); for (TimelineObjectHolder holder : filteredServersLookup) { for (PartitionChunk chunk : holder.getObject()) { @@ -174,7 +170,7 @@ public class CachingClusteredClient implements QueryRunner final byte[] queryCacheKey; if (strategy != null) { - queryCacheKey = strategy.computeCacheKey(prioritizedQuery); + queryCacheKey = strategy.computeCacheKey(query); } else { queryCacheKey = null; } diff --git a/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java b/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java index e3c7962ced0..cf4e31a0f1c 100644 --- a/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java +++ b/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java @@ -24,6 +24,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.google.common.primitives.Ints; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.Sequence; @@ -82,6 +83,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner @Override public Sequence run(final Query query) { + final int priority = Ints.tryParse(query.getContextValue("priority", "0")); return new BaseSequence>( new BaseSequence.IteratorMaker>() { @@ -103,12 +105,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner @Override public int getPriority() { - return Queries.Priority.valueOf( - query.getContextValue( - "priority", - Queries.Priority.NORMAL.name() - ) - ).ordinal(); + return priority; } @Override diff --git a/client/src/main/java/com/metamx/druid/query/PriorityExecutorService.java b/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java similarity index 92% rename from client/src/main/java/com/metamx/druid/query/PriorityExecutorService.java rename to client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java index 44cf6bcb24f..46a7e07486e 100644 --- a/client/src/main/java/com/metamx/druid/query/PriorityExecutorService.java +++ b/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java @@ -36,11 +36,11 @@ import java.util.concurrent.TimeUnit; /** */ -public class PriorityExecutorService extends AbstractExecutorService +public class PrioritizedExecutorService extends AbstractExecutorService { public static ExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config) { - final ExecutorService service = new PriorityExecutorService( + final ExecutorService service = new PrioritizedExecutorService( new ThreadPoolExecutor( config.getNumThreads(), config.getNumThreads(), @@ -69,10 +69,12 @@ public class PriorityExecutorService extends AbstractExecutorService return service; } + private static final int DEFAULT_PRIORITY = 0; + private final ThreadPoolExecutor threadPoolExecutor; - public PriorityExecutorService( + public PrioritizedExecutorService( ThreadPoolExecutor threadPoolExecutor ) { @@ -125,7 +127,7 @@ public class PriorityExecutorService extends AbstractExecutorService @Override public int getPriority() { - return Queries.Priority.NORMAL.ordinal(); + return DEFAULT_PRIORITY; } @Override @@ -156,7 +158,7 @@ public class PriorityExecutorService extends AbstractExecutorService @Override public int compareTo(PrioritizedFuture future) { - return Ints.compare(getPriority(), future.getPriority()); + return -Ints.compare(getPriority(), future.getPriority()); } } } diff --git a/client/src/main/java/com/metamx/druid/query/Queries.java b/client/src/main/java/com/metamx/druid/query/Queries.java index ad64621791e..0ebc18e64fd 100644 --- a/client/src/main/java/com/metamx/druid/query/Queries.java +++ b/client/src/main/java/com/metamx/druid/query/Queries.java @@ -34,11 +34,6 @@ import java.util.Set; */ public class Queries { - public static enum Priority - { - HIGH, NORMAL, LOW - } - public static void verifyAggregations( List aggFactories, List postAggs diff --git a/client/src/test/java/com/metamx/druid/query/PriorityExecutorServiceTest.java b/client/src/test/java/com/metamx/druid/query/PrioritizedExecutorServiceTest.java similarity index 79% rename from client/src/test/java/com/metamx/druid/query/PriorityExecutorServiceTest.java rename to client/src/test/java/com/metamx/druid/query/PrioritizedExecutorServiceTest.java index cfb96f701f9..b0ac2ab2ae2 100644 --- a/client/src/test/java/com/metamx/druid/query/PriorityExecutorServiceTest.java +++ b/client/src/test/java/com/metamx/druid/query/PrioritizedExecutorServiceTest.java @@ -19,7 +19,7 @@ package com.metamx.druid.query; -import com.google.common.collect.Lists; +import com.google.common.collect.ImmutableList; import com.metamx.common.concurrent.ExecutorServiceConfig; import com.metamx.common.lifecycle.Lifecycle; import junit.framework.Assert; @@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService; /** */ -public class PriorityExecutorServiceTest +public class PrioritizedExecutorServiceTest { private ExecutorService exec; private CountDownLatch latch; @@ -42,7 +42,7 @@ public class PriorityExecutorServiceTest @Before public void setUp() throws Exception { - exec = PriorityExecutorService.create( + exec = PrioritizedExecutorService.create( new Lifecycle(), new ExecutorServiceConfig() { @@ -73,7 +73,7 @@ public class PriorityExecutorServiceTest @Test public void testSubmit() throws Exception { - final ConcurrentLinkedQueue order = new ConcurrentLinkedQueue(); + final ConcurrentLinkedQueue order = new ConcurrentLinkedQueue(); exec.submit( new PrioritizedCallable() @@ -81,7 +81,7 @@ public class PriorityExecutorServiceTest @Override public int getPriority() { - return Queries.Priority.NORMAL.ordinal(); + return 0; } @Override @@ -99,13 +99,13 @@ public class PriorityExecutorServiceTest @Override public int getPriority() { - return Queries.Priority.LOW.ordinal(); + return -1; } @Override public Void call() throws Exception { - order.add(Queries.Priority.LOW); + order.add(-1); finishLatch.countDown(); return null; } @@ -117,13 +117,13 @@ public class PriorityExecutorServiceTest @Override public int getPriority() { - return Queries.Priority.HIGH.ordinal(); + return 0; } @Override public Void call() throws Exception { - order.add(Queries.Priority.HIGH); + order.add(0); finishLatch.countDown(); return null; } @@ -135,13 +135,13 @@ public class PriorityExecutorServiceTest @Override public int getPriority() { - return Queries.Priority.NORMAL.ordinal(); + return 2; } @Override public Void call() throws Exception { - order.add(Queries.Priority.NORMAL); + order.add(2); finishLatch.countDown(); return null; } @@ -153,16 +153,7 @@ public class PriorityExecutorServiceTest Assert.assertTrue(order.size() == 3); - List expected = Lists.newArrayList( - Queries.Priority.HIGH, - Queries.Priority.NORMAL, - Queries.Priority.LOW - ); - - int i = 0; - for (Queries.Priority priority : order) { - Assert.assertEquals(expected.get(i), priority); - i++; - } + List expected = ImmutableList.of(2, 0, -1); + Assert.assertEquals(expected, ImmutableList.copyOf(order)); } } diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index e047fcae22f..dd124aabcf2 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -39,7 +39,7 @@ import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.SegmentLoader; import com.metamx.druid.metrics.ServerMonitor; import com.metamx.druid.query.MetricsEmittingExecutorService; -import com.metamx.druid.query.PriorityExecutorService; +import com.metamx.druid.query.PrioritizedExecutorService; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; @@ -97,11 +97,11 @@ public class ComputeNode extends BaseServerNode final List monitors = getMonitors(); final QueryRunnerFactoryConglomerate conglomerate = getConglomerate(); - final ExecutorService innerExecutorService = PriorityExecutorService.create( + final ExecutorService innerExecutorService = PrioritizedExecutorService.create( getLifecycle(), getConfigFactory().buildWithReplacements( ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing") - ) + ), callable2 ); final ExecutorService executorService = new MetricsEmittingExecutorService(