From 573aa96bd6d84838025792fde22e85b270c24e67 Mon Sep 17 00:00:00 2001 From: Nishant Date: Tue, 15 Sep 2015 02:24:12 +0530 Subject: [PATCH] fix #1727 - Union bySegment queries fix Fixes #1727. revert to doing merging for results for union queries on broker. revert unrelated changes Add test for union query runner Add test remove unused imports fix imports fix renamed file fix test update docs. --- .../druid/timeline/UnionTimeLineLookup.java | 78 --------- .../VersionedIntervalTimelineTest.java | 75 --------- docs/content/querying/datasource.md | 1 + .../overlord/ThreadPoolTaskRunner.java | 52 +++--- .../java/io/druid/query/UnionQueryRunner.java | 27 +-- .../io/druid/query/QueryRunnerTestHelper.java | 30 +--- .../io/druid/query/UnionQueryRunnerTest.java | 86 ++++++++++ .../TimeSeriesUnionQueryRunnerTest.java | 157 ++++++++---------- .../io/druid/client/BrokerServerView.java | 23 +-- .../segment/realtime/RealtimeManager.java | 45 ++--- .../server/ClientQuerySegmentWalker.java | 12 +- .../server/coordination/ServerManager.java | 34 ++-- 12 files changed, 239 insertions(+), 381 deletions(-) delete mode 100644 common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java create mode 100644 processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java diff --git a/common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java b/common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java deleted file mode 100644 index 18302fe08bc..00000000000 --- a/common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.timeline; - -import com.google.common.base.Function; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; -import com.metamx.common.guava.Comparators; -import io.druid.timeline.partition.PartitionHolder; -import org.joda.time.Interval; - -import java.util.Comparator; - - -public class UnionTimeLineLookup implements TimelineLookup -{ - Iterable> delegates; - - public UnionTimeLineLookup(Iterable> delegates) - { - // delegate can be null in case there is no segment loaded for the dataSource on this node - this.delegates = Iterables.filter(delegates, Predicates.notNull()); - } - - @Override - public Iterable> lookup(final Interval interval) - { - return Iterables.mergeSorted( - Iterables.transform( - delegates, - new Function, Iterable>>() - { - @Override - public Iterable> apply(TimelineLookup input) - { - return input.lookup(interval); - } - } - ), - new Comparator>() - { - @Override - public int compare( - TimelineObjectHolder o1, TimelineObjectHolder o2 - ) - { - return Comparators.intervalsByStartThenEnd().compare(o1.getInterval(), o2.getInterval()); - } - } - ); - } - - public PartitionHolder findEntry(Interval interval, VersionType version) - { - for (TimelineLookup delegate : delegates) { - final PartitionHolder entry = delegate.findEntry(interval, version); - if (entry != null) { - return entry; - } - } - return null; - } -} diff --git a/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java b/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java index c6651199351..2f19ad3b333 100644 --- a/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java +++ b/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java @@ -1596,79 +1596,4 @@ public class VersionedIntervalTimelineTest return new VersionedIntervalTimeline(Ordering.natural()); } - @Test - public void testUnionTimeLineLookup() - { - TimelineLookup lookup = new UnionTimeLineLookup( - Arrays.>asList( - timeline, - timeline - ) - ); - assertValues( - Arrays.asList( - createExpected("2011-04-01/2011-04-02", "3", 5), - createExpected("2011-04-01/2011-04-02", "3", 5), - createExpected("2011-04-02/2011-04-06", "2", 1), - createExpected("2011-04-02/2011-04-06", "2", 1), - createExpected("2011-04-06/2011-04-09", "3", 4), - createExpected("2011-04-06/2011-04-09", "3", 4) - ), - (List)Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-09"))) - ); - } - - @Test - public void testUnionTimeLineLookupNonExistentDelegates() - { - TimelineLookup lookup = new UnionTimeLineLookup( - Arrays.>asList( - timeline, - null, - timeline, - null - ) - ); - assertValues( - Arrays.asList( - createExpected("2011-04-01/2011-04-02", "3", 5), - createExpected("2011-04-01/2011-04-02", "3", 5), - createExpected("2011-04-02/2011-04-06", "2", 1), - createExpected("2011-04-02/2011-04-06", "2", 1), - createExpected("2011-04-06/2011-04-09", "3", 4), - createExpected("2011-04-06/2011-04-09", "3", 4) - ), - (List)Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-09"))) ); - } - - @Test - public void testUnionTimeLineLookupReturnsSortedValues() - { - timeline = makeStringIntegerTimeline(); - add("2011-04-02/2011-04-06", "1", 1); - add("2011-04-03/2011-04-09", "9", 2); - VersionedIntervalTimeline t1 = timeline; - timeline = makeStringIntegerTimeline(); - add("2011-04-01/2011-04-03", "2", 1); - add("2011-04-03/2011-04-10", "8", 2); - VersionedIntervalTimeline t2 = timeline; - TimelineLookup lookup = new UnionTimeLineLookup( - Arrays.>asList( - t1, t2 - ) - ); - assertValues( - Arrays.asList( - createExpected("2011-04-01/2011-04-03", "2", 1), - createExpected("2011-04-02/2011-04-03", "1", 1), - createExpected("2011-04-03/2011-04-09", "9", 2), - createExpected("2011-04-03/2011-04-10", "8", 2) - - ), - (List) Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-11"))) - ); - } - - - } diff --git a/docs/content/querying/datasource.md b/docs/content/querying/datasource.md index 045aae11929..68fd0d9dba4 100644 --- a/docs/content/querying/datasource.md +++ b/docs/content/querying/datasource.md @@ -28,6 +28,7 @@ This data source unions two or more table data sources. ``` Note that the data sources being unioned should have the same schema. +Union Queries should be always sent to the broker/router node and are *NOT* supported directly by the historical nodes. ### Query Data Source diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 8e598bee2d0..462179d0b15 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -153,43 +153,29 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker return getQueryRunnerImpl(query); } - private QueryRunner getQueryRunnerImpl(final Query query) + private QueryRunner getQueryRunnerImpl(Query query) { - return new UnionQueryRunner<>( - Iterables.transform( - query.getDataSource().getNames(), new Function() - { - @Override - public QueryRunner apply(String queryDataSource) - { - QueryRunner queryRunner = null; + QueryRunner queryRunner = null; + final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames()); - for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) { - final Task task = taskRunnerWorkItem.getTask(); - if (task.getDataSource().equals(queryDataSource)) { - final QueryRunner taskQueryRunner = task.getQueryRunner(query); + for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) { + final Task task = taskRunnerWorkItem.getTask(); + if (task.getDataSource().equals(queryDataSource)) { + final QueryRunner taskQueryRunner = task.getQueryRunner(query); - if (taskQueryRunner != null) { - if (queryRunner == null) { - queryRunner = taskQueryRunner; - } else { - log.makeAlert("Found too many query runners for datasource") - .addData("dataSource", queryDataSource) - .emit(); - } - } - } - } - if (queryRunner != null) { - return queryRunner; - } else { - return new NoopQueryRunner(); - } - } - } - ), conglomerate.findFactory(query).getToolchest() - ); + if (taskQueryRunner != null) { + if (queryRunner == null) { + queryRunner = taskQueryRunner; + } else { + log.makeAlert("Found too many query runners for datasource") + .addData("dataSource", queryDataSource) + .emit(); + } + } + } + } + return queryRunner == null ? new NoopQueryRunner() : queryRunner; } private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index cb7cf7ca177..d731bdc8b29 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -18,7 +18,7 @@ package io.druid.query; import com.google.common.base.Function; -import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -26,35 +26,34 @@ import java.util.Map; public class UnionQueryRunner implements QueryRunner { - private final Iterable baseRunners; + private final QueryRunner baseRunner; private final QueryToolChest> toolChest; public UnionQueryRunner( - Iterable baseRunners, + QueryRunner baseRunner, QueryToolChest> toolChest ) { - this.baseRunners = baseRunners; + this.baseRunner = baseRunner; this.toolChest = toolChest; } @Override public Sequence run(final Query query, final Map responseContext) { - if (Iterables.size(baseRunners) == 1) { - return Iterables.getOnlyElement(baseRunners).run(query, responseContext); - } else { + DataSource dataSource = query.getDataSource(); + if (dataSource instanceof UnionDataSource) { return toolChest.mergeSequencesUnordered( Sequences.simple( - Iterables.transform( - baseRunners, - new Function>() + Lists.transform( + ((UnionDataSource) dataSource).getDataSources(), + new Function>() { @Override - public Sequence apply(QueryRunner singleRunner) + public Sequence apply(DataSource singleSource) { - return singleRunner.run( - query, + return baseRunner.run( + query.withDataSource(singleSource), responseContext ); } @@ -62,6 +61,8 @@ public class UnionQueryRunner implements QueryRunner ) ) ); + } else { + return baseRunner.run(query, responseContext); } } diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index f6e088159b2..65c7dbea899 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -253,20 +253,19 @@ public class QueryRunnerTestHelper return Arrays.asList( new Object[][]{ { - makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId), unionDataSource) + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)) }, { - makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex), unionDataSource) + makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)) }, { makeUnionQueryRunner( factory, - new QueryableIndexSegment(segmentId, mergedRealtimeIndex), - unionDataSource + new QueryableIndexSegment(segmentId, mergedRealtimeIndex) ) }, { - makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId), unionDataSource) + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId)) } } ); @@ -341,28 +340,17 @@ public class QueryRunnerTestHelper } public static QueryRunner makeUnionQueryRunner( - final QueryRunnerFactory> factory, - final Segment adapter, - final DataSource unionDataSource + QueryRunnerFactory> factory, + Segment adapter ) { return new FinalizeResultsQueryRunner( factory.getToolchest().postMergeQueryDecoration( factory.getToolchest().mergeResults( new UnionQueryRunner( - Iterables.transform( - unionDataSource.getNames(), new Function() - { - @Nullable - @Override - public QueryRunner apply(@Nullable String input) - { - return new BySegmentQueryRunner( - segmentId, adapter.getDataInterval().getStart(), - factory.createRunner(adapter) - ); - } - } + new BySegmentQueryRunner( + segmentId, adapter.getDataInterval().getStart(), + factory.createRunner(adapter) ), factory.getToolchest() ) diff --git a/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java new file mode 100644 index 00000000000..0cb24e6124b --- /dev/null +++ b/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java @@ -0,0 +1,86 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.query; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import junit.framework.Assert; +import org.junit.Test; + +public class UnionQueryRunnerTest +{ + @Test + public void testUnionQueryRunner() + { + QueryRunner baseRunner = new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + // verify that table datasource is passed to baseQueryRunner + Assert.assertTrue(query.getDataSource() instanceof TableDataSource); + String dsName = Iterables.getOnlyElement(query.getDataSource().getNames()); + if (dsName.equals("ds1")) { + responseContext.put("ds1", "ds1"); + return Sequences.simple(Arrays.asList(1, 2, 3)); + } else if (dsName.equals("ds2")) { + responseContext.put("ds2", "ds2"); + return Sequences.simple(Arrays.asList(4, 5, 6)); + } else { + throw new AssertionError("Unexpected DataSource"); + } + } + }; + UnionQueryRunner runner = new UnionQueryRunner( + baseRunner, + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ) + ); + // Make a dummy query with Union datasource + Query q = Druids.newTimeseriesQueryBuilder() + .dataSource( + new UnionDataSource( + Arrays.asList( + new TableDataSource("ds1"), + new TableDataSource("ds2") + ) + ) + ) + .intervals("2014-01-01T00:00:00Z/2015-01-01T00:00:00Z") + .aggregators(QueryRunnerTestHelper.commonAggregators) + .build(); + Map responseContext = Maps.newHashMap(); + Sequence result = runner.run(q, responseContext); + List res = Sequences.toList(result, Lists.newArrayList()); + Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), res); + + // verify response context + Assert.assertEquals(2, responseContext.size()); + Assert.assertEquals("ds1", responseContext.get("ds1")); + Assert.assertEquals("ds2", responseContext.get("ds2")); + } + +} diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 9e8032caa5a..68afc07bda1 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -137,99 +137,84 @@ public class TimeSeriesUnionQueryRunnerTest ) ) .build(); - QueryToolChest toolChest = new TimeseriesQueryQueryToolChest( - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()); + QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()); QueryRunner mergingrunner = toolChest.mergeResults( new UnionQueryRunner>( - (Iterable) Arrays.asList( - new QueryRunner>() - { - @Override - public Sequence> run( - Query> query, - Map context - ) - { - return Sequences.simple( - Lists.newArrayList( - new Result<>( - new DateTime("2011-04-02"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 1L, - "idx", - 2L - ) - ) - ), - new Result<>( - new DateTime("2011-04-03"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 3L, - "idx", - 4L - ) - ) - ) - ) - ); - } - }, - new QueryRunner>() - { - - @Override - public Sequence> run( - Query> query, - Map context - ) - { - { - return Sequences.simple( - Lists.newArrayList( - new Result<>( - new DateTime("2011-04-01"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 5L, - "idx", - 6L - ) - + new QueryRunner>() + { + @Override + public Sequence> run(Query> query, + Map responseContext + ) + { + if (query.getDataSource().equals(new TableDataSource("ds1"))) { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 1L, + "idx", + 2L ) - ), - new Result<>( - new DateTime("2011-04-02"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 7L, - "idx", - 8L - ) - ) - ), - new Result<>( - new DateTime("2011-04-04"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 9L, - "idx", - 10L - ) + ) + ), + new Result( + new DateTime("2011-04-03"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 3L, + "idx", + 4L ) ) ) - ); - } - } + ) + ); + } else { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 5L, + "idx", + 6L + ) + ) + ), + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 7L, + "idx", + 8L + ) + ) + ), + new Result( + new DateTime("2011-04-04"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 9L, + "idx", + 10L + ) + ) + ) + ) + ); } - ), + } + }, toolChest ) ); diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 9b788784f48..706e7ee652d 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -39,7 +39,6 @@ import io.druid.query.QueryWatcher; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineLookup; -import io.druid.timeline.UnionTimeLineLookup; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; @@ -260,27 +259,11 @@ public class BrokerServerView implements TimelineServerView @Override - public TimelineLookup getTimeline(DataSource dataSource) + public VersionedIntervalTimeline getTimeline(DataSource dataSource) { - final List tables = dataSource.getNames(); + String table = Iterables.getOnlyElement(dataSource.getNames()); synchronized (lock) { - if (tables.size() == 1) { - return timelines.get(tables.get(0)); - } else { - return new UnionTimeLineLookup<>( - Iterables.transform( - tables, new Function>() - { - - @Override - public TimelineLookup apply(String input) - { - return timelines.get(input); - } - } - ) - ); - } + return timelines.get(table); } } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 818aac2f519..5566f14c387 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -148,34 +148,23 @@ public class RealtimeManager implements QuerySegmentWalker public QueryRunner getQueryRunnerForSegments(final Query query, Iterable specs) { final QueryRunnerFactory> factory = conglomerate.findFactory(query); - final List names = query.getDataSource().getNames(); - return new UnionQueryRunner<>( - Iterables.transform( - names, new Function() - { - @Override - public QueryRunner apply(String input) - { - Iterable chiefsOfDataSource = chiefs.get(input); - return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( - factory.mergeRunners( - MoreExecutors.sameThreadExecutor(), - // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock - Iterables.transform( - chiefsOfDataSource, new Function>() - { - @Override - public QueryRunner apply(FireChief input) - { - return input.getQueryRunner(query); - } - } - ) - ) - ); - } - } - ), conglomerate.findFactory(query).getToolchest() + + Iterable chiefsOfDataSource = chiefs.get(Iterables.getOnlyElement(query.getDataSource().getNames())); + return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( + factory.mergeRunners( + MoreExecutors.sameThreadExecutor(), + // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock + Iterables.transform( + chiefsOfDataSource, new Function>() + { + @Override + public QueryRunner apply(FireChief input) + { + return input.getQueryRunner(query); + } + } + ) + ) ); } diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 4fced547fee..8af8b31dd2b 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -29,6 +29,7 @@ import io.druid.query.RetryQueryRunner; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.SegmentDescriptor; +import io.druid.query.UnionQueryRunner; import java.util.Map; import org.joda.time.Interval; @@ -82,19 +83,22 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner( toolChest.postMergeQueryDecoration( toolChest.mergeResults( - toolChest.preMergeQueryDecoration( - new RetryQueryRunner( + new UnionQueryRunner( + toolChest.preMergeQueryDecoration( + new RetryQueryRunner( baseClient, toolChest, retryConfig, - objectMapper) + objectMapper + ) + ), + toolChest ) ) ), toolChest ); - final Map context = query.getContext(); PostProcessingOperator postProcessing = null; if (context != null) { diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 982c65ea73d..0cbb504f6b3 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -51,6 +51,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.ReferenceCountingSegmentQueryRunner; import io.druid.query.ReportTimelineMissingSegmentQueryRunner; import io.druid.query.SegmentDescriptor; +import io.druid.query.TableDataSource; import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.ReferenceCountingSegment; @@ -60,7 +61,6 @@ import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineLookup; import io.druid.timeline.TimelineObjectHolder; -import io.druid.timeline.UnionTimeLineLookup; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionHolder; @@ -260,11 +260,12 @@ public class ServerManager implements QuerySegmentWalker final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); DataSource dataSource = query.getDataSource(); - if (dataSource instanceof QueryDataSource) { + if (!(dataSource instanceof TableDataSource)) { throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); } + String dataSourceName = getDataSourceName(dataSource); - final TimelineLookup timeline = getTimelineLookup(query.getDataSource()); + final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); if (timeline == null) { return new NoopQueryRunner(); @@ -334,26 +335,9 @@ public class ServerManager implements QuerySegmentWalker ); } - private TimelineLookup getTimelineLookup(DataSource dataSource) + private String getDataSourceName(DataSource dataSource) { - final List names = dataSource.getNames(); - if (names.size() == 1) { - return dataSources.get(names.get(0)); - } else { - return new UnionTimeLineLookup<>( - Iterables.transform( - names, new Function>() - { - - @Override - public TimelineLookup apply(String input) - { - return dataSources.get(input); - } - } - ) - ); - } + return Iterables.getOnlyElement(dataSource.getNames()); } @Override @@ -369,7 +353,11 @@ public class ServerManager implements QuerySegmentWalker final QueryToolChest> toolChest = factory.getToolchest(); - final TimelineLookup timeline = getTimelineLookup(query.getDataSource()); + String dataSourceName = getDataSourceName(query.getDataSource()); + + final VersionedIntervalTimeline timeline = dataSources.get( + dataSourceName + ); if (timeline == null) { return new NoopQueryRunner();