diff --git a/common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java b/common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java deleted file mode 100644 index 18302fe08bc..00000000000 --- a/common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.timeline; - -import com.google.common.base.Function; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; -import com.metamx.common.guava.Comparators; -import io.druid.timeline.partition.PartitionHolder; -import org.joda.time.Interval; - -import java.util.Comparator; - - -public class UnionTimeLineLookup implements TimelineLookup -{ - Iterable> delegates; - - public UnionTimeLineLookup(Iterable> delegates) - { - // delegate can be null in case there is no segment loaded for the dataSource on this node - this.delegates = Iterables.filter(delegates, Predicates.notNull()); - } - - @Override - public Iterable> lookup(final Interval interval) - { - return Iterables.mergeSorted( - Iterables.transform( - delegates, - new Function, Iterable>>() - { - @Override - public Iterable> apply(TimelineLookup input) - { - return input.lookup(interval); - } - } - ), - new Comparator>() - { - @Override - public int compare( - TimelineObjectHolder o1, TimelineObjectHolder o2 - ) - { - return Comparators.intervalsByStartThenEnd().compare(o1.getInterval(), o2.getInterval()); - } - } - ); - } - - public PartitionHolder findEntry(Interval interval, VersionType version) - { - for (TimelineLookup delegate : delegates) { - final PartitionHolder entry = delegate.findEntry(interval, version); - if (entry != null) { - return entry; - } - } - return null; - } -} diff --git a/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java b/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java index c6651199351..2f19ad3b333 100644 --- a/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java +++ b/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java @@ -1596,79 +1596,4 @@ public class VersionedIntervalTimelineTest return new VersionedIntervalTimeline(Ordering.natural()); } - @Test - public void testUnionTimeLineLookup() - { - TimelineLookup lookup = new UnionTimeLineLookup( - Arrays.>asList( - timeline, - timeline - ) - ); - assertValues( - Arrays.asList( - createExpected("2011-04-01/2011-04-02", "3", 5), - createExpected("2011-04-01/2011-04-02", "3", 5), - createExpected("2011-04-02/2011-04-06", "2", 1), - createExpected("2011-04-02/2011-04-06", "2", 1), - createExpected("2011-04-06/2011-04-09", "3", 4), - createExpected("2011-04-06/2011-04-09", "3", 4) - ), - (List)Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-09"))) - ); - } - - @Test - public void testUnionTimeLineLookupNonExistentDelegates() - { - TimelineLookup lookup = new UnionTimeLineLookup( - Arrays.>asList( - timeline, - null, - timeline, - null - ) - ); - assertValues( - Arrays.asList( - createExpected("2011-04-01/2011-04-02", "3", 5), - createExpected("2011-04-01/2011-04-02", "3", 5), - createExpected("2011-04-02/2011-04-06", "2", 1), - createExpected("2011-04-02/2011-04-06", "2", 1), - createExpected("2011-04-06/2011-04-09", "3", 4), - createExpected("2011-04-06/2011-04-09", "3", 4) - ), - (List)Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-09"))) ); - } - - @Test - public void testUnionTimeLineLookupReturnsSortedValues() - { - timeline = makeStringIntegerTimeline(); - add("2011-04-02/2011-04-06", "1", 1); - add("2011-04-03/2011-04-09", "9", 2); - VersionedIntervalTimeline t1 = timeline; - timeline = makeStringIntegerTimeline(); - add("2011-04-01/2011-04-03", "2", 1); - add("2011-04-03/2011-04-10", "8", 2); - VersionedIntervalTimeline t2 = timeline; - TimelineLookup lookup = new UnionTimeLineLookup( - Arrays.>asList( - t1, t2 - ) - ); - assertValues( - Arrays.asList( - createExpected("2011-04-01/2011-04-03", "2", 1), - createExpected("2011-04-02/2011-04-03", "1", 1), - createExpected("2011-04-03/2011-04-09", "9", 2), - createExpected("2011-04-03/2011-04-10", "8", 2) - - ), - (List) Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-11"))) - ); - } - - - } diff --git a/docs/content/querying/datasource.md b/docs/content/querying/datasource.md index 045aae11929..68fd0d9dba4 100644 --- a/docs/content/querying/datasource.md +++ b/docs/content/querying/datasource.md @@ -28,6 +28,7 @@ This data source unions two or more table data sources. ``` Note that the data sources being unioned should have the same schema. +Union Queries should be always sent to the broker/router node and are *NOT* supported directly by the historical nodes. ### Query Data Source diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 8e598bee2d0..462179d0b15 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -153,43 +153,29 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker return getQueryRunnerImpl(query); } - private QueryRunner getQueryRunnerImpl(final Query query) + private QueryRunner getQueryRunnerImpl(Query query) { - return new UnionQueryRunner<>( - Iterables.transform( - query.getDataSource().getNames(), new Function() - { - @Override - public QueryRunner apply(String queryDataSource) - { - QueryRunner queryRunner = null; + QueryRunner queryRunner = null; + final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames()); - for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) { - final Task task = taskRunnerWorkItem.getTask(); - if (task.getDataSource().equals(queryDataSource)) { - final QueryRunner taskQueryRunner = task.getQueryRunner(query); + for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) { + final Task task = taskRunnerWorkItem.getTask(); + if (task.getDataSource().equals(queryDataSource)) { + final QueryRunner taskQueryRunner = task.getQueryRunner(query); - if (taskQueryRunner != null) { - if (queryRunner == null) { - queryRunner = taskQueryRunner; - } else { - log.makeAlert("Found too many query runners for datasource") - .addData("dataSource", queryDataSource) - .emit(); - } - } - } - } - if (queryRunner != null) { - return queryRunner; - } else { - return new NoopQueryRunner(); - } - } - } - ), conglomerate.findFactory(query).getToolchest() - ); + if (taskQueryRunner != null) { + if (queryRunner == null) { + queryRunner = taskQueryRunner; + } else { + log.makeAlert("Found too many query runners for datasource") + .addData("dataSource", queryDataSource) + .emit(); + } + } + } + } + return queryRunner == null ? new NoopQueryRunner() : queryRunner; } private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index cb7cf7ca177..d731bdc8b29 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -18,7 +18,7 @@ package io.druid.query; import com.google.common.base.Function; -import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -26,35 +26,34 @@ import java.util.Map; public class UnionQueryRunner implements QueryRunner { - private final Iterable baseRunners; + private final QueryRunner baseRunner; private final QueryToolChest> toolChest; public UnionQueryRunner( - Iterable baseRunners, + QueryRunner baseRunner, QueryToolChest> toolChest ) { - this.baseRunners = baseRunners; + this.baseRunner = baseRunner; this.toolChest = toolChest; } @Override public Sequence run(final Query query, final Map responseContext) { - if (Iterables.size(baseRunners) == 1) { - return Iterables.getOnlyElement(baseRunners).run(query, responseContext); - } else { + DataSource dataSource = query.getDataSource(); + if (dataSource instanceof UnionDataSource) { return toolChest.mergeSequencesUnordered( Sequences.simple( - Iterables.transform( - baseRunners, - new Function>() + Lists.transform( + ((UnionDataSource) dataSource).getDataSources(), + new Function>() { @Override - public Sequence apply(QueryRunner singleRunner) + public Sequence apply(DataSource singleSource) { - return singleRunner.run( - query, + return baseRunner.run( + query.withDataSource(singleSource), responseContext ); } @@ -62,6 +61,8 @@ public class UnionQueryRunner implements QueryRunner ) ) ); + } else { + return baseRunner.run(query, responseContext); } } diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index f6e088159b2..65c7dbea899 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -253,20 +253,19 @@ public class QueryRunnerTestHelper return Arrays.asList( new Object[][]{ { - makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId), unionDataSource) + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)) }, { - makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex), unionDataSource) + makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)) }, { makeUnionQueryRunner( factory, - new QueryableIndexSegment(segmentId, mergedRealtimeIndex), - unionDataSource + new QueryableIndexSegment(segmentId, mergedRealtimeIndex) ) }, { - makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId), unionDataSource) + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId)) } } ); @@ -341,28 +340,17 @@ public class QueryRunnerTestHelper } public static QueryRunner makeUnionQueryRunner( - final QueryRunnerFactory> factory, - final Segment adapter, - final DataSource unionDataSource + QueryRunnerFactory> factory, + Segment adapter ) { return new FinalizeResultsQueryRunner( factory.getToolchest().postMergeQueryDecoration( factory.getToolchest().mergeResults( new UnionQueryRunner( - Iterables.transform( - unionDataSource.getNames(), new Function() - { - @Nullable - @Override - public QueryRunner apply(@Nullable String input) - { - return new BySegmentQueryRunner( - segmentId, adapter.getDataInterval().getStart(), - factory.createRunner(adapter) - ); - } - } + new BySegmentQueryRunner( + segmentId, adapter.getDataInterval().getStart(), + factory.createRunner(adapter) ), factory.getToolchest() ) diff --git a/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java new file mode 100644 index 00000000000..0cb24e6124b --- /dev/null +++ b/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java @@ -0,0 +1,86 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.query; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import junit.framework.Assert; +import org.junit.Test; + +public class UnionQueryRunnerTest +{ + @Test + public void testUnionQueryRunner() + { + QueryRunner baseRunner = new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + // verify that table datasource is passed to baseQueryRunner + Assert.assertTrue(query.getDataSource() instanceof TableDataSource); + String dsName = Iterables.getOnlyElement(query.getDataSource().getNames()); + if (dsName.equals("ds1")) { + responseContext.put("ds1", "ds1"); + return Sequences.simple(Arrays.asList(1, 2, 3)); + } else if (dsName.equals("ds2")) { + responseContext.put("ds2", "ds2"); + return Sequences.simple(Arrays.asList(4, 5, 6)); + } else { + throw new AssertionError("Unexpected DataSource"); + } + } + }; + UnionQueryRunner runner = new UnionQueryRunner( + baseRunner, + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ) + ); + // Make a dummy query with Union datasource + Query q = Druids.newTimeseriesQueryBuilder() + .dataSource( + new UnionDataSource( + Arrays.asList( + new TableDataSource("ds1"), + new TableDataSource("ds2") + ) + ) + ) + .intervals("2014-01-01T00:00:00Z/2015-01-01T00:00:00Z") + .aggregators(QueryRunnerTestHelper.commonAggregators) + .build(); + Map responseContext = Maps.newHashMap(); + Sequence result = runner.run(q, responseContext); + List res = Sequences.toList(result, Lists.newArrayList()); + Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), res); + + // verify response context + Assert.assertEquals(2, responseContext.size()); + Assert.assertEquals("ds1", responseContext.get("ds1")); + Assert.assertEquals("ds2", responseContext.get("ds2")); + } + +} diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 9e8032caa5a..68afc07bda1 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -137,99 +137,84 @@ public class TimeSeriesUnionQueryRunnerTest ) ) .build(); - QueryToolChest toolChest = new TimeseriesQueryQueryToolChest( - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()); + QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()); QueryRunner mergingrunner = toolChest.mergeResults( new UnionQueryRunner>( - (Iterable) Arrays.asList( - new QueryRunner>() - { - @Override - public Sequence> run( - Query> query, - Map context - ) - { - return Sequences.simple( - Lists.newArrayList( - new Result<>( - new DateTime("2011-04-02"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 1L, - "idx", - 2L - ) - ) - ), - new Result<>( - new DateTime("2011-04-03"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 3L, - "idx", - 4L - ) - ) - ) - ) - ); - } - }, - new QueryRunner>() - { - - @Override - public Sequence> run( - Query> query, - Map context - ) - { - { - return Sequences.simple( - Lists.newArrayList( - new Result<>( - new DateTime("2011-04-01"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 5L, - "idx", - 6L - ) - + new QueryRunner>() + { + @Override + public Sequence> run(Query> query, + Map responseContext + ) + { + if (query.getDataSource().equals(new TableDataSource("ds1"))) { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 1L, + "idx", + 2L ) - ), - new Result<>( - new DateTime("2011-04-02"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 7L, - "idx", - 8L - ) - ) - ), - new Result<>( - new DateTime("2011-04-04"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 9L, - "idx", - 10L - ) + ) + ), + new Result( + new DateTime("2011-04-03"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 3L, + "idx", + 4L ) ) ) - ); - } - } + ) + ); + } else { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 5L, + "idx", + 6L + ) + ) + ), + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 7L, + "idx", + 8L + ) + ) + ), + new Result( + new DateTime("2011-04-04"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 9L, + "idx", + 10L + ) + ) + ) + ) + ); } - ), + } + }, toolChest ) ); diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 9b788784f48..706e7ee652d 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -39,7 +39,6 @@ import io.druid.query.QueryWatcher; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineLookup; -import io.druid.timeline.UnionTimeLineLookup; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; @@ -260,27 +259,11 @@ public class BrokerServerView implements TimelineServerView @Override - public TimelineLookup getTimeline(DataSource dataSource) + public VersionedIntervalTimeline getTimeline(DataSource dataSource) { - final List tables = dataSource.getNames(); + String table = Iterables.getOnlyElement(dataSource.getNames()); synchronized (lock) { - if (tables.size() == 1) { - return timelines.get(tables.get(0)); - } else { - return new UnionTimeLineLookup<>( - Iterables.transform( - tables, new Function>() - { - - @Override - public TimelineLookup apply(String input) - { - return timelines.get(input); - } - } - ) - ); - } + return timelines.get(table); } } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 818aac2f519..5566f14c387 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -148,34 +148,23 @@ public class RealtimeManager implements QuerySegmentWalker public QueryRunner getQueryRunnerForSegments(final Query query, Iterable specs) { final QueryRunnerFactory> factory = conglomerate.findFactory(query); - final List names = query.getDataSource().getNames(); - return new UnionQueryRunner<>( - Iterables.transform( - names, new Function() - { - @Override - public QueryRunner apply(String input) - { - Iterable chiefsOfDataSource = chiefs.get(input); - return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( - factory.mergeRunners( - MoreExecutors.sameThreadExecutor(), - // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock - Iterables.transform( - chiefsOfDataSource, new Function>() - { - @Override - public QueryRunner apply(FireChief input) - { - return input.getQueryRunner(query); - } - } - ) - ) - ); - } - } - ), conglomerate.findFactory(query).getToolchest() + + Iterable chiefsOfDataSource = chiefs.get(Iterables.getOnlyElement(query.getDataSource().getNames())); + return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( + factory.mergeRunners( + MoreExecutors.sameThreadExecutor(), + // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock + Iterables.transform( + chiefsOfDataSource, new Function>() + { + @Override + public QueryRunner apply(FireChief input) + { + return input.getQueryRunner(query); + } + } + ) + ) ); } diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 4fced547fee..8af8b31dd2b 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -29,6 +29,7 @@ import io.druid.query.RetryQueryRunner; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.SegmentDescriptor; +import io.druid.query.UnionQueryRunner; import java.util.Map; import org.joda.time.Interval; @@ -82,19 +83,22 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner( toolChest.postMergeQueryDecoration( toolChest.mergeResults( - toolChest.preMergeQueryDecoration( - new RetryQueryRunner( + new UnionQueryRunner( + toolChest.preMergeQueryDecoration( + new RetryQueryRunner( baseClient, toolChest, retryConfig, - objectMapper) + objectMapper + ) + ), + toolChest ) ) ), toolChest ); - final Map context = query.getContext(); PostProcessingOperator postProcessing = null; if (context != null) { diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 982c65ea73d..0cbb504f6b3 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -51,6 +51,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.ReferenceCountingSegmentQueryRunner; import io.druid.query.ReportTimelineMissingSegmentQueryRunner; import io.druid.query.SegmentDescriptor; +import io.druid.query.TableDataSource; import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.ReferenceCountingSegment; @@ -60,7 +61,6 @@ import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineLookup; import io.druid.timeline.TimelineObjectHolder; -import io.druid.timeline.UnionTimeLineLookup; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionHolder; @@ -260,11 +260,12 @@ public class ServerManager implements QuerySegmentWalker final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); DataSource dataSource = query.getDataSource(); - if (dataSource instanceof QueryDataSource) { + if (!(dataSource instanceof TableDataSource)) { throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); } + String dataSourceName = getDataSourceName(dataSource); - final TimelineLookup timeline = getTimelineLookup(query.getDataSource()); + final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); if (timeline == null) { return new NoopQueryRunner(); @@ -334,26 +335,9 @@ public class ServerManager implements QuerySegmentWalker ); } - private TimelineLookup getTimelineLookup(DataSource dataSource) + private String getDataSourceName(DataSource dataSource) { - final List names = dataSource.getNames(); - if (names.size() == 1) { - return dataSources.get(names.get(0)); - } else { - return new UnionTimeLineLookup<>( - Iterables.transform( - names, new Function>() - { - - @Override - public TimelineLookup apply(String input) - { - return dataSources.get(input); - } - } - ) - ); - } + return Iterables.getOnlyElement(dataSource.getNames()); } @Override @@ -369,7 +353,11 @@ public class ServerManager implements QuerySegmentWalker final QueryToolChest> toolChest = factory.getToolchest(); - final TimelineLookup timeline = getTimelineLookup(query.getDataSource()); + String dataSourceName = getDataSourceName(query.getDataSource()); + + final VersionedIntervalTimeline timeline = dataSources.get( + dataSourceName + ); if (timeline == null) { return new NoopQueryRunner();