diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 0b310aa401d..231799d3a31 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -40,13 +40,16 @@ import io.druid.indexing.common.task.Task; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QuerySegmentWalker; import io.druid.query.SegmentDescriptor; +import io.druid.query.UnionQueryRunner; import org.apache.commons.io.FileUtils; import org.joda.time.Interval; import java.io.File; import java.util.Collection; +import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; @@ -59,16 +62,18 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker private final TaskToolboxFactory toolboxFactory; private final ListeningExecutorService exec; private final Set runningItems = new ConcurrentSkipListSet<>(); - + private final QueryRunnerFactoryConglomerate conglomerate; private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class); @Inject public ThreadPoolTaskRunner( - TaskToolboxFactory toolboxFactory + TaskToolboxFactory toolboxFactory, + QueryRunnerFactoryConglomerate conglomerate ) { this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory"); this.exec = MoreExecutors.listeningDecorator(Execs.singleThreaded("task-runner-%d")); + this.conglomerate = conglomerate; } @LifecycleStop @@ -152,8 +157,10 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker private QueryRunner getQueryRunnerImpl(Query query) { - QueryRunner queryRunner = null; - final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames()); + final List dataSources = query.getDataSource().getNames(); + List runners = Lists.newArrayList(); + for(String queryDataSource : dataSources) { + QueryRunner queryRunner = null; for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) { final Task task = taskRunnerWorkItem.getTask(); @@ -170,9 +177,18 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker } } } + } + if(queryRunner!= null) { + runners.add(queryRunner); + } + } + if(runners.size() == 0){ + return new NoopQueryRunner(); + } else if (runners.size() == 1){ + return runners.get(0); + } else { + return new UnionQueryRunner<>(runners, conglomerate.findFactory(query).getToolchest()); } - - return queryRunner == null ? new NoopQueryRunner() : queryRunner; } private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 44bbb4c8f6d..1a3c9cec42f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -208,7 +208,7 @@ public class TaskLifecycleTest ), new DefaultObjectMapper() ); - tr = new ThreadPoolTaskRunner(tb); + tr = new ThreadPoolTaskRunner(tb, null); tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter); tq.start(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 11756824c34..478a77494a3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -180,7 +180,8 @@ public class WorkerTaskMonitorTest } ) ), jsonMapper - ) + ), + null ), new WorkerConfig().setCapacity(1) ); diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index 09eba3bf5be..acd07e67404 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -31,11 +31,11 @@ import java.util.List; public class UnionQueryRunner implements QueryRunner { - private final Iterable> baseRunners; + private final Iterable baseRunners; private final QueryToolChest> toolChest; public UnionQueryRunner( - Iterable> baseRunners, + Iterable baseRunners, QueryToolChest> toolChest ) { @@ -50,10 +50,10 @@ public class UnionQueryRunner implements QueryRunner Sequences.simple( Iterables.transform( baseRunners, - new Function, Sequence>() + new Function>() { @Override - public Sequence apply(QueryRunner singleRunner) + public Sequence apply(QueryRunner singleRunner) { return singleRunner.run( query diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 15ba20ddb50..2163dc12f72 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -267,11 +267,11 @@ public class QueryRunnerTestHelper factory.getToolchest().mergeResults( new UnionQueryRunner( Iterables.transform( - unionDataSource.getNames(), new Function>() + unionDataSource.getNames(), new Function() { @Nullable @Override - public QueryRunner apply(@Nullable String input) + public QueryRunner apply(@Nullable String input) { return new BySegmentQueryRunner( segmentId, adapter.getDataInterval().getStart(), diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 3998c7503b1..c8a6fb30668 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -125,18 +125,18 @@ public class RealtimeManager implements QuerySegmentWalker final List names = query.getDataSource().getNames(); if (names.size() == 1) { final FireChief chief = chiefs.get(names.get(0)); - return chief == null ? new NoopQueryRunner() : chief.getQueryRunner(query); + return chief == null ? new NoopQueryRunner() : chief.getQueryRunner(query); } else { return new UnionQueryRunner<>( Iterables.transform( - names, new Function>() + names, new Function() { @Nullable @Override public QueryRunner apply(@Nullable String input) { final FireChief chief = chiefs.get(input); - return chief == null ? new NoopQueryRunner() : chief.getQueryRunner(query); + return chief == null ? new NoopQueryRunner() : chief.getQueryRunner(query); } } ), conglomerate.findFactory(query).getToolchest()