Merge pull request #1730 from metamx/union-queries-fix

fix #1727 - Union bySegment queries fix
This commit is contained in:
Charles Allen 2015-09-29 12:23:25 -07:00
commit 2d847ad654
12 changed files with 239 additions and 381 deletions

View File

@ -1,78 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.timeline;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.metamx.common.guava.Comparators;
import io.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import java.util.Comparator;
public class UnionTimeLineLookup<VersionType, ObjectType> implements TimelineLookup<VersionType, ObjectType>
{
Iterable<TimelineLookup<VersionType, ObjectType>> delegates;
public UnionTimeLineLookup(Iterable<TimelineLookup<VersionType, ObjectType>> delegates)
{
// delegate can be null in case there is no segment loaded for the dataSource on this node
this.delegates = Iterables.filter(delegates, Predicates.notNull());
}
@Override
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookup(final Interval interval)
{
return Iterables.mergeSorted(
Iterables.transform(
delegates,
new Function<TimelineLookup<VersionType, ObjectType>, Iterable<TimelineObjectHolder<VersionType, ObjectType>>>()
{
@Override
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> apply(TimelineLookup<VersionType, ObjectType> input)
{
return input.lookup(interval);
}
}
),
new Comparator<TimelineObjectHolder<VersionType, ObjectType>>()
{
@Override
public int compare(
TimelineObjectHolder<VersionType, ObjectType> o1, TimelineObjectHolder<VersionType, ObjectType> o2
)
{
return Comparators.intervalsByStartThenEnd().compare(o1.getInterval(), o2.getInterval());
}
}
);
}
public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version)
{
for (TimelineLookup<VersionType, ObjectType> delegate : delegates) {
final PartitionHolder<ObjectType> entry = delegate.findEntry(interval, version);
if (entry != null) {
return entry;
}
}
return null;
}
}

View File

@ -1596,79 +1596,4 @@ public class VersionedIntervalTimelineTest
return new VersionedIntervalTimeline<String, Integer>(Ordering.<String>natural());
}
@Test
public void testUnionTimeLineLookup()
{
TimelineLookup<String, Integer> lookup = new UnionTimeLineLookup<String, Integer>(
Arrays.<TimelineLookup<String, Integer>>asList(
timeline,
timeline
)
);
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-02", "3", 5),
createExpected("2011-04-01/2011-04-02", "3", 5),
createExpected("2011-04-02/2011-04-06", "2", 1),
createExpected("2011-04-02/2011-04-06", "2", 1),
createExpected("2011-04-06/2011-04-09", "3", 4),
createExpected("2011-04-06/2011-04-09", "3", 4)
),
(List)Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-09")))
);
}
@Test
public void testUnionTimeLineLookupNonExistentDelegates()
{
TimelineLookup<String, Integer> lookup = new UnionTimeLineLookup<String, Integer>(
Arrays.<TimelineLookup<String, Integer>>asList(
timeline,
null,
timeline,
null
)
);
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-02", "3", 5),
createExpected("2011-04-01/2011-04-02", "3", 5),
createExpected("2011-04-02/2011-04-06", "2", 1),
createExpected("2011-04-02/2011-04-06", "2", 1),
createExpected("2011-04-06/2011-04-09", "3", 4),
createExpected("2011-04-06/2011-04-09", "3", 4)
),
(List)Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-09"))) );
}
@Test
public void testUnionTimeLineLookupReturnsSortedValues()
{
timeline = makeStringIntegerTimeline();
add("2011-04-02/2011-04-06", "1", 1);
add("2011-04-03/2011-04-09", "9", 2);
VersionedIntervalTimeline t1 = timeline;
timeline = makeStringIntegerTimeline();
add("2011-04-01/2011-04-03", "2", 1);
add("2011-04-03/2011-04-10", "8", 2);
VersionedIntervalTimeline t2 = timeline;
TimelineLookup<String, Integer> lookup = new UnionTimeLineLookup<String, Integer>(
Arrays.<TimelineLookup<String, Integer>>asList(
t1, t2
)
);
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-03", "2", 1),
createExpected("2011-04-02/2011-04-03", "1", 1),
createExpected("2011-04-03/2011-04-09", "9", 2),
createExpected("2011-04-03/2011-04-10", "8", 2)
),
(List) Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-11")))
);
}
}

View File

@ -28,6 +28,7 @@ This data source unions two or more table data sources.
```
Note that the data sources being unioned should have the same schema.
Union Queries should be always sent to the broker/router node and are *NOT* supported directly by the historical nodes.
### Query Data Source

View File

@ -153,43 +153,29 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
return getQueryRunnerImpl(query);
}
private <T> QueryRunner<T> getQueryRunnerImpl(final Query<T> query)
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
return new UnionQueryRunner<>(
Iterables.transform(
query.getDataSource().getNames(), new Function<String, QueryRunner>()
{
@Override
public QueryRunner apply(String queryDataSource)
{
QueryRunner<T> queryRunner = null;
QueryRunner<T> queryRunner = null;
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
if (task.getDataSource().equals(queryDataSource)) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
if (task.getDataSource().equals(queryDataSource)) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
if (taskQueryRunner != null) {
if (queryRunner == null) {
queryRunner = taskQueryRunner;
} else {
log.makeAlert("Found too many query runners for datasource")
.addData("dataSource", queryDataSource)
.emit();
}
}
}
}
if (queryRunner != null) {
return queryRunner;
} else {
return new NoopQueryRunner();
}
}
}
), conglomerate.findFactory(query).getToolchest()
);
if (taskQueryRunner != null) {
if (queryRunner == null) {
queryRunner = taskQueryRunner;
} else {
log.makeAlert("Found too many query runners for datasource")
.addData("dataSource", queryDataSource)
.emit();
}
}
}
}
return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner;
}
private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem

View File

@ -18,7 +18,7 @@
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
@ -26,35 +26,34 @@ import java.util.Map;
public class UnionQueryRunner<T> implements QueryRunner<T>
{
private final Iterable<QueryRunner> baseRunners;
private final QueryRunner<T> baseRunner;
private final QueryToolChest<T, Query<T>> toolChest;
public UnionQueryRunner(
Iterable<QueryRunner> baseRunners,
QueryRunner<T> baseRunner,
QueryToolChest<T, Query<T>> toolChest
)
{
this.baseRunners = baseRunners;
this.baseRunner = baseRunner;
this.toolChest = toolChest;
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
if (Iterables.size(baseRunners) == 1) {
return Iterables.getOnlyElement(baseRunners).run(query, responseContext);
} else {
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
return toolChest.mergeSequencesUnordered(
Sequences.simple(
Iterables.transform(
baseRunners,
new Function<QueryRunner, Sequence<T>>()
Lists.transform(
((UnionDataSource) dataSource).getDataSources(),
new Function<DataSource, Sequence<T>>()
{
@Override
public Sequence<T> apply(QueryRunner singleRunner)
public Sequence<T> apply(DataSource singleSource)
{
return singleRunner.run(
query,
return baseRunner.run(
query.withDataSource(singleSource),
responseContext
);
}
@ -62,6 +61,8 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
)
)
);
} else {
return baseRunner.run(query, responseContext);
}
}

View File

@ -253,20 +253,19 @@ public class QueryRunnerTestHelper
return Arrays.asList(
new Object[][]{
{
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId), unionDataSource)
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId))
},
{
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex), unionDataSource)
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex))
},
{
makeUnionQueryRunner(
factory,
new QueryableIndexSegment(segmentId, mergedRealtimeIndex),
unionDataSource
new QueryableIndexSegment(segmentId, mergedRealtimeIndex)
)
},
{
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId), unionDataSource)
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
}
}
);
@ -341,28 +340,17 @@ public class QueryRunnerTestHelper
}
public static <T> QueryRunner<T> makeUnionQueryRunner(
final QueryRunnerFactory<T, Query<T>> factory,
final Segment adapter,
final DataSource unionDataSource
QueryRunnerFactory<T, Query<T>> factory,
Segment adapter
)
{
return new FinalizeResultsQueryRunner<T>(
factory.getToolchest().postMergeQueryDecoration(
factory.getToolchest().mergeResults(
new UnionQueryRunner<T>(
Iterables.transform(
unionDataSource.getNames(), new Function<String, QueryRunner>()
{
@Nullable
@Override
public QueryRunner apply(@Nullable String input)
{
return new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
);
}
}
new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
),
factory.getToolchest()
)

View File

@ -0,0 +1,86 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.query;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
import org.junit.Test;
public class UnionQueryRunnerTest
{
@Test
public void testUnionQueryRunner()
{
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
{
// verify that table datasource is passed to baseQueryRunner
Assert.assertTrue(query.getDataSource() instanceof TableDataSource);
String dsName = Iterables.getOnlyElement(query.getDataSource().getNames());
if (dsName.equals("ds1")) {
responseContext.put("ds1", "ds1");
return Sequences.simple(Arrays.asList(1, 2, 3));
} else if (dsName.equals("ds2")) {
responseContext.put("ds2", "ds2");
return Sequences.simple(Arrays.asList(4, 5, 6));
} else {
throw new AssertionError("Unexpected DataSource");
}
}
};
UnionQueryRunner runner = new UnionQueryRunner(
baseRunner,
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
// Make a dummy query with Union datasource
Query q = Druids.newTimeseriesQueryBuilder()
.dataSource(
new UnionDataSource(
Arrays.asList(
new TableDataSource("ds1"),
new TableDataSource("ds2")
)
)
)
.intervals("2014-01-01T00:00:00Z/2015-01-01T00:00:00Z")
.aggregators(QueryRunnerTestHelper.commonAggregators)
.build();
Map<String, Object> responseContext = Maps.newHashMap();
Sequence result = runner.run(q, responseContext);
List res = Sequences.toList(result, Lists.newArrayList());
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), res);
// verify response context
Assert.assertEquals(2, responseContext.size());
Assert.assertEquals("ds1", responseContext.get("ds1"));
Assert.assertEquals("ds2", responseContext.get("ds2"));
}
}

View File

@ -137,99 +137,84 @@ public class TimeSeriesUnionQueryRunnerTest
)
)
.build();
QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator());
QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator());
QueryRunner mergingrunner = toolChest.mergeResults(
new UnionQueryRunner<Result<TimeseriesResultValue>>(
(Iterable) Arrays.asList(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
Map<String, Object> context
)
{
return Sequences.simple(
Lists.newArrayList(
new Result<>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
1L,
"idx",
2L
)
)
),
new Result<>(
new DateTime("2011-04-03"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
3L,
"idx",
4L
)
)
)
)
);
}
},
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
Map<String, Object> context
)
{
{
return Sequences.simple(
Lists.newArrayList(
new Result<>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
5L,
"idx",
6L
)
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> query,
Map<String, Object> responseContext
)
{
if (query.getDataSource().equals(new TableDataSource("ds1"))) {
return Sequences.simple(
Lists.newArrayList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
1L,
"idx",
2L
)
),
new Result<>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
7L,
"idx",
8L
)
)
),
new Result<>(
new DateTime("2011-04-04"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
9L,
"idx",
10L
)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-03"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
3L,
"idx",
4L
)
)
)
);
}
}
)
);
} else {
return Sequences.simple(
Lists.newArrayList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
5L,
"idx",
6L
)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
7L,
"idx",
8L
)
)
),
new Result<TimeseriesResultValue>(
new DateTime("2011-04-04"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows",
9L,
"idx",
10L
)
)
)
)
);
}
),
}
},
toolChest
)
);

View File

@ -39,7 +39,6 @@ import io.druid.query.QueryWatcher;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.UnionTimeLineLookup;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
@ -260,27 +259,11 @@ public class BrokerServerView implements TimelineServerView
@Override
public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
public VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource)
{
final List<String> tables = dataSource.getNames();
String table = Iterables.getOnlyElement(dataSource.getNames());
synchronized (lock) {
if (tables.size() == 1) {
return timelines.get(tables.get(0));
} else {
return new UnionTimeLineLookup<>(
Iterables.transform(
tables, new Function<String, TimelineLookup<String, ServerSelector>>()
{
@Override
public TimelineLookup<String, ServerSelector> apply(String input)
{
return timelines.get(input);
}
}
)
);
}
return timelines.get(table);
}
}

View File

@ -148,34 +148,23 @@ public class RealtimeManager implements QuerySegmentWalker
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> specs)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
final List<String> names = query.getDataSource().getNames();
return new UnionQueryRunner<>(
Iterables.transform(
names, new Function<String, QueryRunner>()
{
@Override
public QueryRunner<T> apply(String input)
{
Iterable<FireChief> chiefsOfDataSource = chiefs.get(input);
return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults(
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
// Chaining query runners which wait on submitted chain query runners can make executor pools deadlock
Iterables.transform(
chiefsOfDataSource, new Function<FireChief, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(FireChief input)
{
return input.getQueryRunner(query);
}
}
)
)
);
}
}
), conglomerate.findFactory(query).getToolchest()
Iterable<FireChief> chiefsOfDataSource = chiefs.get(Iterables.getOnlyElement(query.getDataSource().getNames()));
return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults(
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
// Chaining query runners which wait on submitted chain query runners can make executor pools deadlock
Iterables.transform(
chiefsOfDataSource, new Function<FireChief, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(FireChief input)
{
return input.getQueryRunner(query);
}
}
)
)
);
}

View File

@ -29,6 +29,7 @@ import io.druid.query.RetryQueryRunner;
import io.druid.query.RetryQueryRunnerConfig;
import io.druid.query.SegmentDescriptor;
import io.druid.query.UnionQueryRunner;
import java.util.Map;
import org.joda.time.Interval;
@ -82,19 +83,22 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final FinalizeResultsQueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<T>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
toolChest.preMergeQueryDecoration(
new RetryQueryRunner<T>(
new UnionQueryRunner<T>(
toolChest.preMergeQueryDecoration(
new RetryQueryRunner<T>(
baseClient,
toolChest,
retryConfig,
objectMapper)
objectMapper
)
),
toolChest
)
)
),
toolChest
);
final Map<String, Object> context = query.getContext();
PostProcessingOperator<T> postProcessing = null;
if (context != null) {

View File

@ -51,6 +51,7 @@ import io.druid.query.QueryToolChest;
import io.druid.query.ReferenceCountingSegmentQueryRunner;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.ReferenceCountingSegment;
@ -60,7 +61,6 @@ import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.UnionTimeLineLookup;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;
@ -260,11 +260,12 @@ public class ServerManager implements QuerySegmentWalker
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) {
if (!(dataSource instanceof TableDataSource)) {
throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported");
}
String dataSourceName = getDataSourceName(dataSource);
final TimelineLookup<String, ReferenceCountingSegment> timeline = getTimelineLookup(query.getDataSource());
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName);
if (timeline == null) {
return new NoopQueryRunner<T>();
@ -334,26 +335,9 @@ public class ServerManager implements QuerySegmentWalker
);
}
private TimelineLookup<String, ReferenceCountingSegment> getTimelineLookup(DataSource dataSource)
private String getDataSourceName(DataSource dataSource)
{
final List<String> names = dataSource.getNames();
if (names.size() == 1) {
return dataSources.get(names.get(0));
} else {
return new UnionTimeLineLookup<>(
Iterables.transform(
names, new Function<String, TimelineLookup<String, ReferenceCountingSegment>>()
{
@Override
public TimelineLookup<String, ReferenceCountingSegment> apply(String input)
{
return dataSources.get(input);
}
}
)
);
}
return Iterables.getOnlyElement(dataSource.getNames());
}
@Override
@ -369,7 +353,11 @@ public class ServerManager implements QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final TimelineLookup<String, ReferenceCountingSegment> timeline = getTimelineLookup(query.getDataSource());
String dataSourceName = getDataSourceName(query.getDataSource());
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(
dataSourceName
);
if (timeline == null) {
return new NoopQueryRunner<T>();