mirror of https://github.com/apache/druid.git
more code review fixes
This commit is contained in:
parent
e20f547261
commit
19c5dacf3a
|
@ -24,7 +24,6 @@ import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Ordering;
|
import com.google.common.collect.Ordering;
|
||||||
import com.google.common.primitives.Ints;
|
|
||||||
import com.metamx.common.guava.BaseSequence;
|
import com.metamx.common.guava.BaseSequence;
|
||||||
import com.metamx.common.guava.MergeIterable;
|
import com.metamx.common.guava.MergeIterable;
|
||||||
import com.metamx.common.guava.Sequence;
|
import com.metamx.common.guava.Sequence;
|
||||||
|
@ -83,7 +82,8 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
|
||||||
@Override
|
@Override
|
||||||
public Sequence<T> run(final Query<T> query)
|
public Sequence<T> run(final Query<T> query)
|
||||||
{
|
{
|
||||||
final int priority = Ints.tryParse(query.getContextValue("priority", "0"));
|
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
|
||||||
|
|
||||||
return new BaseSequence<T, Iterator<T>>(
|
return new BaseSequence<T, Iterator<T>>(
|
||||||
new BaseSequence.IteratorMaker<T, Iterator<T>>()
|
new BaseSequence.IteratorMaker<T, Iterator<T>>()
|
||||||
{
|
{
|
||||||
|
@ -100,14 +100,8 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
|
||||||
public Future<List<T>> apply(final QueryRunner<T> input)
|
public Future<List<T>> apply(final QueryRunner<T> input)
|
||||||
{
|
{
|
||||||
return exec.submit(
|
return exec.submit(
|
||||||
new PrioritizedCallable<List<T>>()
|
new PrioritizedCallable<List<T>>(priority)
|
||||||
{
|
{
|
||||||
@Override
|
|
||||||
public int getPriority()
|
|
||||||
{
|
|
||||||
return priority;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<T> call() throws Exception
|
public List<T> call() throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -23,7 +23,17 @@ import java.util.concurrent.Callable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public interface PrioritizedCallable<T> extends Callable<T>
|
public abstract class PrioritizedCallable<T> implements Callable<T>
|
||||||
{
|
{
|
||||||
public int getPriority();
|
final int priority;
|
||||||
|
|
||||||
|
public PrioritizedCallable(int priority)
|
||||||
|
{
|
||||||
|
this.priority = priority;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPriority()
|
||||||
|
{
|
||||||
|
return priority;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,14 +122,8 @@ public class PrioritizedExecutorService extends AbstractExecutorService
|
||||||
{
|
{
|
||||||
Callable<T> theCallable = tCallable;
|
Callable<T> theCallable = tCallable;
|
||||||
if (!(tCallable instanceof PrioritizedCallable)) {
|
if (!(tCallable instanceof PrioritizedCallable)) {
|
||||||
theCallable = new PrioritizedCallable<T>()
|
theCallable = new PrioritizedCallable<T>(DEFAULT_PRIORITY)
|
||||||
{
|
{
|
||||||
@Override
|
|
||||||
public int getPriority()
|
|
||||||
{
|
|
||||||
return DEFAULT_PRIORITY;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public T call() throws Exception
|
public T call() throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -76,14 +76,8 @@ public class PrioritizedExecutorServiceTest
|
||||||
final ConcurrentLinkedQueue<Integer> order = new ConcurrentLinkedQueue<Integer>();
|
final ConcurrentLinkedQueue<Integer> order = new ConcurrentLinkedQueue<Integer>();
|
||||||
|
|
||||||
exec.submit(
|
exec.submit(
|
||||||
new PrioritizedCallable<Void>()
|
new PrioritizedCallable<Void>(0)
|
||||||
{
|
{
|
||||||
@Override
|
|
||||||
public int getPriority()
|
|
||||||
{
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Void call() throws Exception
|
public Void call() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -94,14 +88,8 @@ public class PrioritizedExecutorServiceTest
|
||||||
);
|
);
|
||||||
|
|
||||||
exec.submit(
|
exec.submit(
|
||||||
new PrioritizedCallable<Void>()
|
new PrioritizedCallable<Void>(-1)
|
||||||
{
|
{
|
||||||
@Override
|
|
||||||
public int getPriority()
|
|
||||||
{
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Void call() throws Exception
|
public Void call() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -112,14 +100,8 @@ public class PrioritizedExecutorServiceTest
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
exec.submit(
|
exec.submit(
|
||||||
new PrioritizedCallable<Void>()
|
new PrioritizedCallable<Void>(0)
|
||||||
{
|
{
|
||||||
@Override
|
|
||||||
public int getPriority()
|
|
||||||
{
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Void call() throws Exception
|
public Void call() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -130,14 +112,8 @@ public class PrioritizedExecutorServiceTest
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
exec.submit(
|
exec.submit(
|
||||||
new PrioritizedCallable<Void>()
|
new PrioritizedCallable<Void>(2)
|
||||||
{
|
{
|
||||||
@Override
|
|
||||||
public int getPriority()
|
|
||||||
{
|
|
||||||
return 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Void call() throws Exception
|
public Void call() throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -101,7 +101,7 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
|
||||||
getLifecycle(),
|
getLifecycle(),
|
||||||
getConfigFactory().buildWithReplacements(
|
getConfigFactory().buildWithReplacements(
|
||||||
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
|
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
|
||||||
), callable2
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
final ExecutorService executorService = new MetricsEmittingExecutorService(
|
final ExecutorService executorService = new MetricsEmittingExecutorService(
|
||||||
|
|
Loading…
Reference in New Issue