Union Queries move merge to historical

This commit is contained in:
nishantmonu51 2014-11-07 18:05:16 +05:30
parent df4604c392
commit 8bebb24fd5
13 changed files with 316 additions and 127 deletions

View File

@ -0,0 +1,44 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.timeline;
import io.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import java.util.List;
public interface TimelineLookup<VersionType, ObjectType>
{
/**
* Does a lookup for the objects representing the given time interval. Will *only* return
* PartitionHolders that are complete.
*
* @param interval interval to find objects for
*
* @return Holders representing the interval that the objects exist for, PartitionHolders
* are guaranteed to be complete
*/
public List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval);
public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version);
}

View File

@ -0,0 +1,54 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.timeline;
import com.google.common.collect.Lists;
import io.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import java.util.List;
public class UnionTimeLineLookup<VersionType, ObjectType> implements TimelineLookup<VersionType, ObjectType>
{
Iterable<TimelineLookup<VersionType,ObjectType>> delegates;
public UnionTimeLineLookup( Iterable<TimelineLookup<VersionType,ObjectType>> delegates){
this.delegates = delegates;
}
@Override
public List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval)
{
List<TimelineObjectHolder<VersionType, ObjectType>> rv = Lists.newArrayList();
for(TimelineLookup<VersionType,ObjectType> delegate : delegates){
rv.addAll(delegate.lookup(interval));
}
return rv;
}
public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version){
for(TimelineLookup<VersionType,ObjectType> delegate : delegates){
final PartitionHolder<ObjectType> entry = delegate.findEntry(interval, version);
if(entry != null){
return entry;
}
}
return null;
}
}

View File

@ -58,7 +58,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* to achieve "atomic" updates. First add new items, then check if those items caused anything to be overshadowed, if * to achieve "atomic" updates. First add new items, then check if those items caused anything to be overshadowed, if
* so, remove the overshadowed elements and you have effectively updated your data set without any user impact. * so, remove the overshadowed elements and you have effectively updated your data set without any user impact.
*/ */
public class VersionedIntervalTimeline<VersionType, ObjectType> public class VersionedIntervalTimeline<VersionType, ObjectType> implements TimelineLookup<VersionType, ObjectType>
{ {
private static final Logger log = new Logger(VersionedIntervalTimeline.class); private static final Logger log = new Logger(VersionedIntervalTimeline.class);

View File

@ -22,49 +22,47 @@
package io.druid.query; package io.druid.query;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import java.util.List;
public class UnionQueryRunner<T> implements QueryRunner<T> public class UnionQueryRunner<T> implements QueryRunner<T>
{ {
private final QueryRunner<T> baseRunner; private final Iterable<QueryRunner<T>> baseRunners;
private final QueryToolChest<T, Query<T>> toolChest; private final QueryToolChest<T, Query<T>> toolChest;
public UnionQueryRunner( public UnionQueryRunner(
QueryRunner<T> baseRunner, Iterable<QueryRunner<T>> baseRunners,
QueryToolChest<T, Query<T>> toolChest QueryToolChest<T, Query<T>> toolChest
) )
{ {
this.baseRunner = baseRunner; this.baseRunners = baseRunners;
this.toolChest = toolChest; this.toolChest = toolChest;
} }
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query)
{ {
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
return toolChest.mergeSequencesUnordered( return toolChest.mergeSequencesUnordered(
Sequences.simple( Sequences.simple(
Lists.transform( Iterables.transform(
((UnionDataSource) dataSource).getDataSources(), baseRunners,
new Function<DataSource, Sequence<T>>() new Function<QueryRunner<T>, Sequence<T>>()
{ {
@Override @Override
public Sequence<T> apply(DataSource singleSource) public Sequence<T> apply(QueryRunner<T> singleRunner)
{ {
return baseRunner.run( return singleRunner.run(
query.withDataSource(singleSource) query
); );
} }
} }
) )
) )
); );
} else {
return baseRunner.run(query);
}
} }
} }

View File

@ -20,6 +20,7 @@
package io.druid.query; package io.druid.query;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
@ -214,7 +215,8 @@ public class QueryRunnerTestHelper
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static Collection<?> makeUnionQueryRunners( public static Collection<?> makeUnionQueryRunners(
QueryRunnerFactory factory QueryRunnerFactory factory,
DataSource unionDataSource
) )
throws IOException throws IOException
{ {
@ -224,13 +226,17 @@ public class QueryRunnerTestHelper
return Arrays.asList( return Arrays.asList(
new Object[][]{ new Object[][]{
{ {
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)) makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId), unionDataSource)
}, },
{ {
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)) makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex), unionDataSource)
}, },
{ {
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) makeUnionQueryRunner(
factory,
new QueryableIndexSegment(segmentId, mergedRealtimeIndex),
unionDataSource
)
} }
} }
); );
@ -251,17 +257,28 @@ public class QueryRunnerTestHelper
} }
public static <T> QueryRunner<T> makeUnionQueryRunner( public static <T> QueryRunner<T> makeUnionQueryRunner(
QueryRunnerFactory<T, Query<T>> factory, final QueryRunnerFactory<T, Query<T>> factory,
Segment adapter final Segment adapter,
final DataSource unionDataSource
) )
{ {
return new FinalizeResultsQueryRunner<T>( return new FinalizeResultsQueryRunner<T>(
factory.getToolchest().postMergeQueryDecoration( factory.getToolchest().postMergeQueryDecoration(
factory.getToolchest().mergeResults( factory.getToolchest().mergeResults(
new UnionQueryRunner<T>( new UnionQueryRunner<T>(
new BySegmentQueryRunner<T>( Iterables.transform(
unionDataSource.getNames(), new Function<String, QueryRunner<T>>()
{
@Nullable
@Override
public QueryRunner<T> apply(@Nullable String input)
{
return new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(), segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter) factory.createRunner(adapter)
);
}
}
), ),
factory.getToolchest() factory.getToolchest()
) )

View File

@ -19,7 +19,9 @@
package io.druid.query.timeseries; package io.druid.query.timeseries;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
@ -41,9 +43,11 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.List; import java.util.List;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@ -66,7 +70,8 @@ public class TimeSeriesUnionQueryRunnerTest
new TimeseriesQueryQueryToolChest(new QueryConfig()), new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) ),
QueryRunnerTestHelper.unionDataSource
); );
} }
@ -139,12 +144,12 @@ public class TimeSeriesUnionQueryRunnerTest
QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(new QueryConfig()); QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(new QueryConfig());
QueryRunner mergingrunner = toolChest.mergeResults( QueryRunner mergingrunner = toolChest.mergeResults(
new UnionQueryRunner<Result<TimeseriesResultValue>>( new UnionQueryRunner<Result<TimeseriesResultValue>>(
(Iterable)Arrays.asList(
new QueryRunner<Result<TimeseriesResultValue>>() new QueryRunner<Result<TimeseriesResultValue>>()
{ {
@Override @Override
public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> query) public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> query)
{ {
if (query.getDataSource().equals(new TableDataSource("ds1"))) {
return Sequences.simple( return Sequences.simple(
Lists.newArrayList( Lists.newArrayList(
new Result<TimeseriesResultValue>( new Result<TimeseriesResultValue>(
@ -171,7 +176,14 @@ public class TimeSeriesUnionQueryRunnerTest
) )
) )
); );
} else { }
},
new QueryRunner<Result<TimeseriesResultValue>>(){
@Override
public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> query)
{
{
return Sequences.simple( return Sequences.simple(
Lists.newArrayList( Lists.newArrayList(
new Result<TimeseriesResultValue>( new Result<TimeseriesResultValue>(
@ -211,7 +223,7 @@ public class TimeSeriesUnionQueryRunnerTest
); );
} }
} }
}, }),
toolChest toolChest
) )
); );

View File

@ -67,7 +67,8 @@ public class TopNUnionQueryTest
TestQueryRunners.getPool(), TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig()), new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) ),
QueryRunnerTestHelper.unionDataSource
) )
); );
retVal.addAll( retVal.addAll(
@ -85,7 +86,8 @@ public class TopNUnionQueryTest
), ),
new TopNQueryQueryToolChest(new TopNQueryConfig()), new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) ),
QueryRunnerTestHelper.unionDataSource
) )
); );

View File

@ -20,6 +20,8 @@
package io.druid.client; package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
@ -37,10 +39,16 @@ import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.QueryWatcher; import io.druid.query.QueryWatcher;
import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.UnionTimeLineLookup;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -245,11 +253,27 @@ public class BrokerServerView implements TimelineServerView
@Override @Override
public VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource) public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
{ {
String table = Iterables.getOnlyElement(dataSource.getNames()); final List<String> tables = dataSource.getNames();
synchronized (lock) { synchronized (lock) {
return timelines.get(table); if (tables.size() == 1) {
return timelines.get(tables.get(0));
} else {
return new UnionTimeLineLookup<>(
Iterables.transform(
tables, new Function<String, TimelineLookup<String, ServerSelector>>()
{
@Override
public TimelineLookup<String, ServerSelector> apply(String input)
{
return timelines.get(input);
}
}
)
);
}
} }
} }

View File

@ -56,6 +56,7 @@ import io.druid.query.aggregation.MetricManipulatorFns;
import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionChunk;
@ -149,7 +150,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build()); final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build());
VersionedIntervalTimeline<String, ServerSelector> timeline = serverView.getTimeline(query.getDataSource()); TimelineLookup<String, ServerSelector> timeline = serverView.getTimeline(query.getDataSource());
if (timeline == null) { if (timeline == null) {
return Sequences.empty(); return Sequences.empty();
} }

View File

@ -22,12 +22,13 @@ package io.druid.client;
import io.druid.client.selector.ServerSelector; import io.druid.client.selector.ServerSelector;
import io.druid.query.DataSource; import io.druid.query.DataSource;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
/** /**
*/ */
public interface TimelineServerView extends ServerView public interface TimelineServerView extends ServerView
{ {
VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource); TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource);
<T> QueryRunner<T> getQueryRunner(DruidServer server); <T> QueryRunner<T> getQueryRunner(DruidServer server);
} }

View File

@ -19,6 +19,7 @@
package io.druid.segment.realtime; package io.druid.segment.realtime;
import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -40,6 +41,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.query.UnionQueryRunner;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumber;
@ -47,6 +49,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -117,11 +120,28 @@ public class RealtimeManager implements QuerySegmentWalker
} }
@Override @Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs) public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> specs)
{ {
final FireChief chief = chiefs.get(getDataSourceName(query)); final List<String> names = query.getDataSource().getNames();
if (names.size() == 1) {
final FireChief chief = chiefs.get(names.get(0));
return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query); return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query);
} else {
return new UnionQueryRunner<>(
Iterables.transform(
names, new Function<String, QueryRunner<T>>()
{
@Nullable
@Override
public QueryRunner<T> apply(@Nullable String input)
{
final FireChief chief = chiefs.get(input);
return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query);
}
}
), conglomerate.findFactory(query).getToolchest()
);
}
} }
private <T> String getDataSourceName(Query<T> query) private <T> String getDataSourceName(Query<T> query)

View File

@ -82,7 +82,6 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final FinalizeResultsQueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<T>( final FinalizeResultsQueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<T>(
toolChest.postMergeQueryDecoration( toolChest.postMergeQueryDecoration(
toolChest.mergeResults( toolChest.mergeResults(
new UnionQueryRunner<T>(
new MetricsEmittingQueryRunner<T>( new MetricsEmittingQueryRunner<T>(
emitter, emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>() new Function<Query<T>, ServiceMetricEvent.Builder>()
@ -94,9 +93,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
} }
}, },
toolChest.preMergeQueryDecoration(baseClient) toolChest.preMergeQueryDecoration(baseClient)
).withWaitMeasuredFromNow(), ).withWaitMeasuredFromNow()
toolChest
)
) )
), ),
toolChest toolChest

View File

@ -33,6 +33,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingQueryRunner; import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache; import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheConfig;
import io.druid.client.selector.ServerSelector;
import io.druid.collections.CountingMap; import io.druid.collections.CountingMap;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
@ -42,6 +43,7 @@ import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.NoopQueryRunner; import io.druid.query.NoopQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
@ -57,7 +59,9 @@ import io.druid.segment.Segment;
import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.UnionTimeLineLookup;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder; import io.druid.timeline.partition.PartitionHolder;
@ -67,6 +71,7 @@ import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -163,7 +168,7 @@ public class ServerManager implements QuerySegmentWalker
VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = dataSources.get(dataSource); VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = dataSources.get(dataSource);
if (loadedIntervals == null) { if (loadedIntervals == null) {
loadedIntervals = new VersionedIntervalTimeline<String, ReferenceCountingSegment>(Ordering.natural()); loadedIntervals = new VersionedIntervalTimeline<>(Ordering.natural());
dataSources.put(dataSource, loadedIntervals); dataSources.put(dataSource, loadedIntervals);
} }
@ -250,12 +255,11 @@ public class ServerManager implements QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest(); final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
DataSource dataSource = query.getDataSource(); DataSource dataSource = query.getDataSource();
if (!(dataSource instanceof TableDataSource)) { if (dataSource instanceof QueryDataSource) {
throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported");
} }
String dataSourceName = getDataSourceName(dataSource);
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName); final TimelineLookup<String, ReferenceCountingSegment> timeline = getTimelineLookup(query.getDataSource());
if (timeline == null) { if (timeline == null) {
return new NoopQueryRunner<T>(); return new NoopQueryRunner<T>();
@ -302,7 +306,6 @@ public class ServerManager implements QuerySegmentWalker
holder.getVersion(), holder.getVersion(),
input.getChunkNumber() input.getChunkNumber()
) )
); );
} }
} }
@ -319,9 +322,26 @@ public class ServerManager implements QuerySegmentWalker
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest);
} }
private String getDataSourceName(DataSource dataSource) private TimelineLookup<String, ReferenceCountingSegment> getTimelineLookup(DataSource dataSource)
{ {
return Iterables.getOnlyElement(dataSource.getNames()); final List<String> names = dataSource.getNames();
if(names.size() == 1){
return dataSources.get(names.get(0));
} else {
return new UnionTimeLineLookup<>(
Iterables.transform(
names, new Function<String, TimelineLookup<String, ReferenceCountingSegment>>()
{
@Override
public TimelineLookup<String, ReferenceCountingSegment> apply(String input)
{
return dataSources.get(input);
}
}
)
);
}
} }
@Override @Override
@ -337,9 +357,7 @@ public class ServerManager implements QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest(); final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
String dataSourceName = getDataSourceName(query.getDataSource()); final TimelineLookup<String, ReferenceCountingSegment> timeline = getTimelineLookup(query.getDataSource());
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName);
if (timeline == null) { if (timeline == null) {
return new NoopQueryRunner<T>(); return new NoopQueryRunner<T>();
@ -354,6 +372,7 @@ public class ServerManager implements QuerySegmentWalker
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Iterable<QueryRunner<T>> apply(SegmentDescriptor input) public Iterable<QueryRunner<T>> apply(SegmentDescriptor input)
{ {
final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry( final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry(
input.getInterval(), input.getVersion() input.getInterval(), input.getVersion()
); );