This commit is contained in:
fjy 2013-05-30 12:58:24 -07:00
parent 71feb0afeb
commit ffbf0e23b6
3 changed files with 31 additions and 9 deletions

View File

@ -124,13 +124,20 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
&& strategy != null;
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
final String priority = query.getContextValue("priority", Integer.toString(Queries.Priority.NORMAL.ordinal()));
final String priority = query.getContextValue("priority", Queries.Priority.NORMAL.name());
final Query<T> prioritizedQuery = query.withOverriddenContext(ImmutableMap.of("priority", priority));
final Query<T> rewrittenQuery;
if (populateCache) {
rewrittenQuery = prioritizedQuery.withOverriddenContext(ImmutableMap.of("bySegment", "true", "intermediate", "true"));
rewrittenQuery = prioritizedQuery.withOverriddenContext(
ImmutableMap.of(
"bySegment",
"true",
"intermediate",
"true"
)
);
} else {
rewrittenQuery = prioritizedQuery.withOverriddenContext(ImmutableMap.of("intermediate", "true"));
}

View File

@ -24,7 +24,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.Sequence;
@ -35,7 +34,6 @@ import com.metamx.druid.Query;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@ -105,12 +103,12 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
@Override
public int getPriority()
{
return Ints.tryParse(
return Queries.Priority.valueOf(
query.getContextValue(
"priority",
Integer.toString(Queries.Priority.NORMAL.ordinal())
Queries.Priority.NORMAL.name()
)
);
).ordinal();
}
@Override

View File

@ -116,9 +116,26 @@ public class PriorityExecutorService extends AbstractExecutorService
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> tCallable)
protected <T> RunnableFuture<T> newTaskFor(final Callable<T> tCallable)
{
return new PrioritizedFuture<T>((PrioritizedCallable) tCallable);
Callable<T> theCallable = tCallable;
if (!(tCallable instanceof PrioritizedCallable)) {
theCallable = new PrioritizedCallable<T>()
{
@Override
public int getPriority()
{
return Queries.Priority.NORMAL.ordinal();
}
@Override
public T call() throws Exception
{
return tCallable.call();
}
};
}
return new PrioritizedFuture<T>((PrioritizedCallable) theCallable);
}
private static class PrioritizedFuture<V> extends FutureTask<V> implements Comparable<PrioritizedFuture>