handle union query on realtime node

This commit is contained in:
nishantmonu51 2014-11-07 23:27:50 +05:30
parent 8bebb24fd5
commit fd8eb7742b
6 changed files with 34 additions and 17 deletions

View File

@ -40,13 +40,16 @@ import io.druid.indexing.common.task.Task;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import io.druid.query.UnionQueryRunner;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import java.io.File;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
@ -59,16 +62,18 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
private final TaskToolboxFactory toolboxFactory;
private final ListeningExecutorService exec;
private final Set<ThreadPoolTaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<>();
private final QueryRunnerFactoryConglomerate conglomerate;
private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class);
@Inject
public ThreadPoolTaskRunner(
TaskToolboxFactory toolboxFactory
TaskToolboxFactory toolboxFactory,
QueryRunnerFactoryConglomerate conglomerate
)
{
this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory");
this.exec = MoreExecutors.listeningDecorator(Execs.singleThreaded("task-runner-%d"));
this.conglomerate = conglomerate;
}
@LifecycleStop
@ -152,8 +157,10 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
QueryRunner<T> queryRunner = null;
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
final List<String> dataSources = query.getDataSource().getNames();
List<QueryRunner> runners = Lists.newArrayList();
for(String queryDataSource : dataSources) {
QueryRunner<T> queryRunner = null;
for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
@ -170,9 +177,18 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
}
}
}
}
if(queryRunner!= null) {
runners.add(queryRunner);
}
}
if(runners.size() == 0){
return new NoopQueryRunner();
} else if (runners.size() == 1){
return runners.get(0);
} else {
return new UnionQueryRunner<>(runners, conglomerate.findFactory(query).getToolchest());
}
return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner;
}
private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem

View File

@ -208,7 +208,7 @@ public class TaskLifecycleTest
),
new DefaultObjectMapper()
);
tr = new ThreadPoolTaskRunner(tb);
tr = new ThreadPoolTaskRunner(tb, null);
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);
tq.start();
}

View File

@ -180,7 +180,8 @@ public class WorkerTaskMonitorTest
}
)
), jsonMapper
)
),
null
),
new WorkerConfig().setCapacity(1)
);

View File

@ -31,11 +31,11 @@ import java.util.List;
public class UnionQueryRunner<T> implements QueryRunner<T>
{
private final Iterable<QueryRunner<T>> baseRunners;
private final Iterable<QueryRunner> baseRunners;
private final QueryToolChest<T, Query<T>> toolChest;
public UnionQueryRunner(
Iterable<QueryRunner<T>> baseRunners,
Iterable<QueryRunner> baseRunners,
QueryToolChest<T, Query<T>> toolChest
)
{
@ -50,10 +50,10 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
Sequences.simple(
Iterables.transform(
baseRunners,
new Function<QueryRunner<T>, Sequence<T>>()
new Function<QueryRunner, Sequence<T>>()
{
@Override
public Sequence<T> apply(QueryRunner<T> singleRunner)
public Sequence<T> apply(QueryRunner singleRunner)
{
return singleRunner.run(
query

View File

@ -267,11 +267,11 @@ public class QueryRunnerTestHelper
factory.getToolchest().mergeResults(
new UnionQueryRunner<T>(
Iterables.transform(
unionDataSource.getNames(), new Function<String, QueryRunner<T>>()
unionDataSource.getNames(), new Function<String, QueryRunner>()
{
@Nullable
@Override
public QueryRunner<T> apply(@Nullable String input)
public QueryRunner apply(@Nullable String input)
{
return new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),

View File

@ -125,18 +125,18 @@ public class RealtimeManager implements QuerySegmentWalker
final List<String> names = query.getDataSource().getNames();
if (names.size() == 1) {
final FireChief chief = chiefs.get(names.get(0));
return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query);
return chief == null ? new NoopQueryRunner() : chief.getQueryRunner(query);
} else {
return new UnionQueryRunner<>(
Iterables.transform(
names, new Function<String, QueryRunner<T>>()
names, new Function<String, QueryRunner>()
{
@Nullable
@Override
public QueryRunner<T> apply(@Nullable String input)
{
final FireChief chief = chiefs.get(input);
return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query);
return chief == null ? new NoopQueryRunner() : chief.getQueryRunner(query);
}
}
), conglomerate.findFactory(query).getToolchest()