fix according to code review

This commit is contained in:
fjy 2013-06-06 15:37:19 -07:00
parent d48b4d8d97
commit e20f547261
6 changed files with 38 additions and 57 deletions

View File

@ -125,25 +125,21 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
&& strategy != null;
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
final String priority = query.getContextValue("priority", Queries.Priority.NORMAL.name());
final Query<T> prioritizedQuery = query.withOverriddenContext(ImmutableMap.of("priority", priority));
final Query<T> rewrittenQuery;
ImmutableMap.Builder<String, String> contextBuilder = new ImmutableMap.Builder<String, String>();
final String priority = query.getContextValue("priority", "0");
contextBuilder.put("priority", priority);
if (populateCache) {
rewrittenQuery = prioritizedQuery.withOverriddenContext(
ImmutableMap.of(
"bySegment",
"true",
"intermediate",
"true"
)
);
} else {
rewrittenQuery = prioritizedQuery.withOverriddenContext(ImmutableMap.of("intermediate", "true"));
contextBuilder.put("bySegment", "true");
}
contextBuilder.put("intermediate", "true");
VersionedIntervalTimeline<String, ServerSelector> timeline = serverView.getTimeline(prioritizedQuery.getDataSource());
final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build());
VersionedIntervalTimeline<String, ServerSelector> timeline = serverView.getTimeline(query.getDataSource());
if (timeline == null) {
return Sequences.empty();
}
@ -159,7 +155,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// Let tool chest filter out unneeded segments
final List<TimelineObjectHolder<String, ServerSelector>> filteredServersLookup =
toolChest.filterSegments(prioritizedQuery, serversLookup);
toolChest.filterSegments(query, serversLookup);
for (TimelineObjectHolder<String, ServerSelector> holder : filteredServersLookup) {
for (PartitionChunk<ServerSelector> chunk : holder.getObject()) {
@ -174,7 +170,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final byte[] queryCacheKey;
if (strategy != null) {
queryCacheKey = strategy.computeCacheKey(prioritizedQuery);
queryCacheKey = strategy.computeCacheKey(query);
} else {
queryCacheKey = null;
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.Sequence;
@ -82,6 +83,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query)
{
final int priority = Ints.tryParse(query.getContextValue("priority", "0"));
return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>()
{
@ -103,12 +105,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
@Override
public int getPriority()
{
return Queries.Priority.valueOf(
query.getContextValue(
"priority",
Queries.Priority.NORMAL.name()
)
).ordinal();
return priority;
}
@Override

View File

@ -36,11 +36,11 @@ import java.util.concurrent.TimeUnit;
/**
*/
public class PriorityExecutorService extends AbstractExecutorService
public class PrioritizedExecutorService extends AbstractExecutorService
{
public static ExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config)
{
final ExecutorService service = new PriorityExecutorService(
final ExecutorService service = new PrioritizedExecutorService(
new ThreadPoolExecutor(
config.getNumThreads(),
config.getNumThreads(),
@ -69,10 +69,12 @@ public class PriorityExecutorService extends AbstractExecutorService
return service;
}
private static final int DEFAULT_PRIORITY = 0;
private final ThreadPoolExecutor threadPoolExecutor;
public PriorityExecutorService(
public PrioritizedExecutorService(
ThreadPoolExecutor threadPoolExecutor
)
{
@ -125,7 +127,7 @@ public class PriorityExecutorService extends AbstractExecutorService
@Override
public int getPriority()
{
return Queries.Priority.NORMAL.ordinal();
return DEFAULT_PRIORITY;
}
@Override
@ -156,7 +158,7 @@ public class PriorityExecutorService extends AbstractExecutorService
@Override
public int compareTo(PrioritizedFuture future)
{
return Ints.compare(getPriority(), future.getPriority());
return -Ints.compare(getPriority(), future.getPriority());
}
}
}

View File

@ -34,11 +34,6 @@ import java.util.Set;
*/
public class Queries
{
public static enum Priority
{
HIGH, NORMAL, LOW
}
public static void verifyAggregations(
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs

View File

@ -19,7 +19,7 @@
package com.metamx.druid.query;
import com.google.common.collect.Lists;
import com.google.common.collect.ImmutableList;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.lifecycle.Lifecycle;
import junit.framework.Assert;
@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService;
/**
*/
public class PriorityExecutorServiceTest
public class PrioritizedExecutorServiceTest
{
private ExecutorService exec;
private CountDownLatch latch;
@ -42,7 +42,7 @@ public class PriorityExecutorServiceTest
@Before
public void setUp() throws Exception
{
exec = PriorityExecutorService.create(
exec = PrioritizedExecutorService.create(
new Lifecycle(),
new ExecutorServiceConfig()
{
@ -73,7 +73,7 @@ public class PriorityExecutorServiceTest
@Test
public void testSubmit() throws Exception
{
final ConcurrentLinkedQueue<Queries.Priority> order = new ConcurrentLinkedQueue<Queries.Priority>();
final ConcurrentLinkedQueue<Integer> order = new ConcurrentLinkedQueue<Integer>();
exec.submit(
new PrioritizedCallable<Void>()
@ -81,7 +81,7 @@ public class PriorityExecutorServiceTest
@Override
public int getPriority()
{
return Queries.Priority.NORMAL.ordinal();
return 0;
}
@Override
@ -99,13 +99,13 @@ public class PriorityExecutorServiceTest
@Override
public int getPriority()
{
return Queries.Priority.LOW.ordinal();
return -1;
}
@Override
public Void call() throws Exception
{
order.add(Queries.Priority.LOW);
order.add(-1);
finishLatch.countDown();
return null;
}
@ -117,13 +117,13 @@ public class PriorityExecutorServiceTest
@Override
public int getPriority()
{
return Queries.Priority.HIGH.ordinal();
return 0;
}
@Override
public Void call() throws Exception
{
order.add(Queries.Priority.HIGH);
order.add(0);
finishLatch.countDown();
return null;
}
@ -135,13 +135,13 @@ public class PriorityExecutorServiceTest
@Override
public int getPriority()
{
return Queries.Priority.NORMAL.ordinal();
return 2;
}
@Override
public Void call() throws Exception
{
order.add(Queries.Priority.NORMAL);
order.add(2);
finishLatch.countDown();
return null;
}
@ -153,16 +153,7 @@ public class PriorityExecutorServiceTest
Assert.assertTrue(order.size() == 3);
List<Queries.Priority> expected = Lists.newArrayList(
Queries.Priority.HIGH,
Queries.Priority.NORMAL,
Queries.Priority.LOW
);
int i = 0;
for (Queries.Priority priority : order) {
Assert.assertEquals(expected.get(i), priority);
i++;
}
List<Integer> expected = ImmutableList.of(2, 0, -1);
Assert.assertEquals(expected, ImmutableList.copyOf(order));
}
}

View File

@ -39,7 +39,7 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.metrics.ServerMonitor;
import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.druid.query.PriorityExecutorService;
import com.metamx.druid.query.PrioritizedExecutorService;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
@ -97,11 +97,11 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
final List<Monitor> monitors = getMonitors();
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
final ExecutorService innerExecutorService = PriorityExecutorService.create(
final ExecutorService innerExecutorService = PrioritizedExecutorService.create(
getLifecycle(),
getConfigFactory().buildWithReplacements(
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
), callable2
);
final ExecutorService executorService = new MetricsEmittingExecutorService(