diff --git a/common/src/main/java/io/druid/timeline/TimelineLookup.java b/common/src/main/java/io/druid/timeline/TimelineLookup.java new file mode 100644 index 00000000000..2fc343ac9b2 --- /dev/null +++ b/common/src/main/java/io/druid/timeline/TimelineLookup.java @@ -0,0 +1,44 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.timeline; + +import io.druid.timeline.partition.PartitionHolder; +import org.joda.time.Interval; + +import java.util.List; + + +public interface TimelineLookup +{ + + /** + * Does a lookup for the objects representing the given time interval. Will *only* return + * PartitionHolders that are complete. + * + * @param interval interval to find objects for + * + * @return Holders representing the interval that the objects exist for, PartitionHolders + * are guaranteed to be complete + */ + public List> lookup(Interval interval); + + public PartitionHolder findEntry(Interval interval, VersionType version); + +} diff --git a/common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java b/common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java new file mode 100644 index 00000000000..d5cf8f89f08 --- /dev/null +++ b/common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java @@ -0,0 +1,54 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.timeline; + +import com.google.common.collect.Lists; +import io.druid.timeline.partition.PartitionHolder; +import org.joda.time.Interval; + +import java.util.List; + + +public class UnionTimeLineLookup implements TimelineLookup +{ + Iterable> delegates; + public UnionTimeLineLookup( Iterable> delegates){ + this.delegates = delegates; + } + @Override + public List> lookup(Interval interval) + { + List> rv = Lists.newArrayList(); + for(TimelineLookup delegate : delegates){ + rv.addAll(delegate.lookup(interval)); + } + return rv; + } + + public PartitionHolder findEntry(Interval interval, VersionType version){ + for(TimelineLookup delegate : delegates){ + final PartitionHolder entry = delegate.findEntry(interval, version); + if(entry != null){ + return entry; + } + } + return null; + } +} diff --git a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java index 903480e2dac..1c73782c8d5 100644 --- a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java +++ b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java @@ -58,7 +58,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * to achieve "atomic" updates. First add new items, then check if those items caused anything to be overshadowed, if * so, remove the overshadowed elements and you have effectively updated your data set without any user impact. */ -public class VersionedIntervalTimeline +public class VersionedIntervalTimeline implements TimelineLookup { private static final Logger log = new Logger(VersionedIntervalTimeline.class); diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index f14bb180f62..09eba3bf5be 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -22,49 +22,47 @@ package io.druid.query; import com.google.common.base.Function; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import java.util.List; + public class UnionQueryRunner implements QueryRunner { - private final QueryRunner baseRunner; + private final Iterable> baseRunners; private final QueryToolChest> toolChest; public UnionQueryRunner( - QueryRunner baseRunner, + Iterable> baseRunners, QueryToolChest> toolChest ) { - this.baseRunner = baseRunner; + this.baseRunners = baseRunners; this.toolChest = toolChest; } @Override public Sequence run(final Query query) { - DataSource dataSource = query.getDataSource(); - if (dataSource instanceof UnionDataSource) { return toolChest.mergeSequencesUnordered( Sequences.simple( - Lists.transform( - ((UnionDataSource) dataSource).getDataSources(), - new Function>() + Iterables.transform( + baseRunners, + new Function, Sequence>() { @Override - public Sequence apply(DataSource singleSource) + public Sequence apply(QueryRunner singleRunner) { - return baseRunner.run( - query.withDataSource(singleSource) + return singleRunner.run( + query ); } } ) ) ); - } else { - return baseRunner.run(query); - } } } diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index deeaff563e8..15ba20ddb50 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -20,6 +20,7 @@ package io.druid.query; import com.google.common.base.Function; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import io.druid.granularity.QueryGranularity; @@ -70,14 +71,14 @@ public class QueryRunnerTestHelper public static final UnionDataSource unionDataSource = new UnionDataSource( Lists.transform( Lists.newArrayList(dataSource, dataSource, dataSource, dataSource), new Function() - { - @Nullable - @Override - public TableDataSource apply(@Nullable String input) - { - return new TableDataSource(input); - } - } + { + @Nullable + @Override + public TableDataSource apply(@Nullable String input) + { + return new TableDataSource(input); + } + } ) ); public static final QueryGranularity dayGran = QueryGranularity.DAY; @@ -214,7 +215,8 @@ public class QueryRunnerTestHelper @SuppressWarnings("unchecked") public static Collection makeUnionQueryRunners( - QueryRunnerFactory factory + QueryRunnerFactory factory, + DataSource unionDataSource ) throws IOException { @@ -224,13 +226,17 @@ public class QueryRunnerTestHelper return Arrays.asList( new Object[][]{ { - makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)) + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId), unionDataSource) }, { - makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)) + makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex), unionDataSource) }, { - makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) + makeUnionQueryRunner( + factory, + new QueryableIndexSegment(segmentId, mergedRealtimeIndex), + unionDataSource + ) } } ); @@ -251,17 +257,28 @@ public class QueryRunnerTestHelper } public static QueryRunner makeUnionQueryRunner( - QueryRunnerFactory> factory, - Segment adapter + final QueryRunnerFactory> factory, + final Segment adapter, + final DataSource unionDataSource ) { return new FinalizeResultsQueryRunner( factory.getToolchest().postMergeQueryDecoration( factory.getToolchest().mergeResults( new UnionQueryRunner( - new BySegmentQueryRunner( - segmentId, adapter.getDataInterval().getStart(), - factory.createRunner(adapter) + Iterables.transform( + unionDataSource.getNames(), new Function>() + { + @Nullable + @Override + public QueryRunner apply(@Nullable String input) + { + return new BySegmentQueryRunner( + segmentId, adapter.getDataInterval().getStart(), + factory.createRunner(adapter) + ); + } + } ), factory.getToolchest() ) diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 84ace0a798f..9a56a0208e4 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -19,7 +19,9 @@ package io.druid.query.timeseries; +import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -41,9 +43,11 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Iterator; import java.util.List; @RunWith(Parameterized.class) @@ -66,7 +70,8 @@ public class TimeSeriesUnionQueryRunnerTest new TimeseriesQueryQueryToolChest(new QueryConfig()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER - ) + ), + QueryRunnerTestHelper.unionDataSource ); } @@ -139,79 +144,86 @@ public class TimeSeriesUnionQueryRunnerTest QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(new QueryConfig()); QueryRunner mergingrunner = toolChest.mergeResults( new UnionQueryRunner>( - new QueryRunner>() - { - @Override - public Sequence> run(Query> query) - { - if (query.getDataSource().equals(new TableDataSource("ds1"))) { - return Sequences.simple( - Lists.newArrayList( - new Result( - new DateTime("2011-04-02"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 1L, - "idx", - 2L + (Iterable)Arrays.asList( + new QueryRunner>() + { + @Override + public Sequence> run(Query> query) + { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 1L, + "idx", + 2L + ) + ) + ), + new Result( + new DateTime("2011-04-03"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 3L, + "idx", + 4L + ) + ) + ) + ) + ); + } + }, + new QueryRunner>(){ + + @Override + public Sequence> run(Query> query) + { + { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 5L, + "idx", + 6L + ) ) - ) - ), - new Result( - new DateTime("2011-04-03"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 3L, - "idx", - 4L + ), + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 7L, + "idx", + 8L + ) + ) + ), + new Result( + new DateTime("2011-04-04"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 9L, + "idx", + 10L + ) ) ) ) - ) - ); - } else { - return Sequences.simple( - Lists.newArrayList( - new Result( - new DateTime("2011-04-01"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 5L, - "idx", - 6L - ) - ) - ), - new Result( - new DateTime("2011-04-02"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 7L, - "idx", - 8L - ) - ) - ), - new Result( - new DateTime("2011-04-04"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 9L, - "idx", - 10L - ) - ) - ) - ) - ); - } - } - }, + ); + } + } + }), toolChest ) ); diff --git a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java index 7dc7b645cad..5034fda9865 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java @@ -67,7 +67,8 @@ public class TopNUnionQueryTest TestQueryRunners.getPool(), new TopNQueryQueryToolChest(new TopNQueryConfig()), QueryRunnerTestHelper.NOOP_QUERYWATCHER - ) + ), + QueryRunnerTestHelper.unionDataSource ) ); retVal.addAll( @@ -85,7 +86,8 @@ public class TopNUnionQueryTest ), new TopNQueryQueryToolChest(new TopNQueryConfig()), QueryRunnerTestHelper.NOOP_QUERYWATCHER - ) + ), + QueryRunnerTestHelper.unionDataSource ) ); diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index c0403c129e7..76b1207e508 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -20,6 +20,8 @@ package io.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Lists; +import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; @@ -37,10 +39,16 @@ import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryWatcher; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineLookup; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.UnionTimeLineLookup; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; +import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -245,11 +253,27 @@ public class BrokerServerView implements TimelineServerView @Override - public VersionedIntervalTimeline getTimeline(DataSource dataSource) + public TimelineLookup getTimeline(DataSource dataSource) { - String table = Iterables.getOnlyElement(dataSource.getNames()); + final List tables = dataSource.getNames(); synchronized (lock) { - return timelines.get(table); + if (tables.size() == 1) { + return timelines.get(tables.get(0)); + } else { + return new UnionTimeLineLookup<>( + Iterables.transform( + tables, new Function>() + { + + @Override + public TimelineLookup apply(String input) + { + return timelines.get(input); + } + } + ) + ); + } } } diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index f189e060670..087bc96927d 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -56,6 +56,7 @@ import io.druid.query.aggregation.MetricManipulatorFns; import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineLookup; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; @@ -149,7 +150,7 @@ public class CachingClusteredClient implements QueryRunner final Query rewrittenQuery = query.withOverriddenContext(contextBuilder.build()); - VersionedIntervalTimeline timeline = serverView.getTimeline(query.getDataSource()); + TimelineLookup timeline = serverView.getTimeline(query.getDataSource()); if (timeline == null) { return Sequences.empty(); } diff --git a/server/src/main/java/io/druid/client/TimelineServerView.java b/server/src/main/java/io/druid/client/TimelineServerView.java index 0a6a43c8fdb..b3cc8cf6832 100644 --- a/server/src/main/java/io/druid/client/TimelineServerView.java +++ b/server/src/main/java/io/druid/client/TimelineServerView.java @@ -22,12 +22,13 @@ package io.druid.client; import io.druid.client.selector.ServerSelector; import io.druid.query.DataSource; import io.druid.query.QueryRunner; +import io.druid.timeline.TimelineLookup; import io.druid.timeline.VersionedIntervalTimeline; /** */ public interface TimelineServerView extends ServerView { - VersionedIntervalTimeline getTimeline(DataSource dataSource); + TimelineLookup getTimeline(DataSource dataSource); QueryRunner getQueryRunner(DruidServer server); } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index a1cfb220972..3998c7503b1 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -19,6 +19,7 @@ package io.druid.segment.realtime; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -40,6 +41,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.SegmentDescriptor; +import io.druid.query.UnionQueryRunner; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.plumber.Plumber; @@ -47,6 +49,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.List; @@ -117,11 +120,28 @@ public class RealtimeManager implements QuerySegmentWalker } @Override - public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) + public QueryRunner getQueryRunnerForSegments(final Query query, Iterable specs) { - final FireChief chief = chiefs.get(getDataSourceName(query)); - - return chief == null ? new NoopQueryRunner() : chief.getQueryRunner(query); + final List names = query.getDataSource().getNames(); + if (names.size() == 1) { + final FireChief chief = chiefs.get(names.get(0)); + return chief == null ? new NoopQueryRunner() : chief.getQueryRunner(query); + } else { + return new UnionQueryRunner<>( + Iterables.transform( + names, new Function>() + { + @Nullable + @Override + public QueryRunner apply(@Nullable String input) + { + final FireChief chief = chiefs.get(input); + return chief == null ? new NoopQueryRunner() : chief.getQueryRunner(query); + } + } + ), conglomerate.findFactory(query).getToolchest() + ); + } } private String getDataSourceName(Query query) diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 311f0162c5a..c81fa618b2f 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -82,7 +82,6 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner( toolChest.postMergeQueryDecoration( toolChest.mergeResults( - new UnionQueryRunner( new MetricsEmittingQueryRunner( emitter, new Function, ServiceMetricEvent.Builder>() @@ -94,9 +93,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker } }, toolChest.preMergeQueryDecoration(baseClient) - ).withWaitMeasuredFromNow(), - toolChest - ) + ).withWaitMeasuredFromNow() ) ), toolChest diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index e79e4a3a978..e5445774e0a 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -33,6 +33,7 @@ import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.CachingQueryRunner; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.selector.ServerSelector; import io.druid.collections.CountingMap; import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Smile; @@ -42,6 +43,7 @@ import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -57,7 +59,9 @@ import io.druid.segment.Segment; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineLookup; import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.UnionTimeLineLookup; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionHolder; @@ -67,6 +71,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -163,7 +168,7 @@ public class ServerManager implements QuerySegmentWalker VersionedIntervalTimeline loadedIntervals = dataSources.get(dataSource); if (loadedIntervals == null) { - loadedIntervals = new VersionedIntervalTimeline(Ordering.natural()); + loadedIntervals = new VersionedIntervalTimeline<>(Ordering.natural()); dataSources.put(dataSource, loadedIntervals); } @@ -250,12 +255,11 @@ public class ServerManager implements QuerySegmentWalker final QueryToolChest> toolChest = factory.getToolchest(); DataSource dataSource = query.getDataSource(); - if (!(dataSource instanceof TableDataSource)) { + if (dataSource instanceof QueryDataSource) { throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); } - String dataSourceName = getDataSourceName(dataSource); - final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); + final TimelineLookup timeline = getTimelineLookup(query.getDataSource()); if (timeline == null) { return new NoopQueryRunner(); @@ -302,7 +306,6 @@ public class ServerManager implements QuerySegmentWalker holder.getVersion(), input.getChunkNumber() ) - ); } } @@ -319,9 +322,26 @@ public class ServerManager implements QuerySegmentWalker return new FinalizeResultsQueryRunner(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); } - private String getDataSourceName(DataSource dataSource) + private TimelineLookup getTimelineLookup(DataSource dataSource) { - return Iterables.getOnlyElement(dataSource.getNames()); + final List names = dataSource.getNames(); + if(names.size() == 1){ + return dataSources.get(names.get(0)); + } else { + return new UnionTimeLineLookup<>( + Iterables.transform( + names, new Function>() + { + + @Override + public TimelineLookup apply(String input) + { + return dataSources.get(input); + } + } + ) + ); + } } @Override @@ -337,9 +357,7 @@ public class ServerManager implements QuerySegmentWalker final QueryToolChest> toolChest = factory.getToolchest(); - String dataSourceName = getDataSourceName(query.getDataSource()); - - final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); + final TimelineLookup timeline = getTimelineLookup(query.getDataSource()); if (timeline == null) { return new NoopQueryRunner(); @@ -354,6 +372,7 @@ public class ServerManager implements QuerySegmentWalker @SuppressWarnings("unchecked") public Iterable> apply(SegmentDescriptor input) { + final PartitionHolder entry = timeline.findEntry( input.getInterval(), input.getVersion() );