A set of fixes to retry the query for missing intervals in the timeline

This commit is contained in:
fjy 2014-11-20 12:04:37 -08:00
parent 2f08ab85fc
commit 3d9d989a9f
43 changed files with 438 additions and 201 deletions

View File

@ -49,11 +49,11 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@Override
@SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query, Map<String, Object> context)
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
if (query.getContextBySegment(false)) {
final Sequence<T> baseSequence = base.run(query, context);
final Sequence<T> baseSequence = base.run(query, responseContext);
final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
return Sequences.simple(
Arrays.asList(
@ -68,6 +68,6 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
)
);
}
return base.run(query, context);
return base.run(query, responseContext);
}
}

View File

@ -37,13 +37,13 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> context)
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
if (query.getContextBySegment(false)) {
return baseRunner.run(query, context);
return baseRunner.run(query, responseContext);
}
return doRun(baseRunner, query, context);
return doRun(baseRunner, query, responseContext);
}
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context);

View File

@ -20,7 +20,6 @@
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -89,12 +88,12 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
// since it already implements ListeningExecutorService
this.exec = MoreExecutors.listeningDecorator(exec);
this.ordering = ordering;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.queryables = Iterables.unmodifiableIterable(queryables);
this.queryWatcher = queryWatcher;
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final int priority = query.getContextPriority(0);
@ -125,11 +124,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public Iterable<T> call() throws Exception
{
try {
if (input == null) {
throw new ISE("Input is null?! How is this possible?!");
}
Sequence<T> result = input.run(query, context);
Sequence<T> result = input.run(query, responseContext);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}

View File

@ -39,7 +39,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
return Sequences.concat(
Sequences.map(
@ -49,7 +49,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> apply(final QueryRunner<T> input)
{
return input.run(query, context);
return input.run(query, responseContext);
}
}
)

View File

@ -49,7 +49,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> context)
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
final boolean isBySegment = query.getContextBySegment(false);
final boolean shouldFinalize = query.getContextFinalize(true);
@ -102,7 +102,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
return Sequences.map(
baseRunner.run(queryToRun, context),
baseRunner.run(queryToRun, responseContext),
finalizerFn
);

View File

@ -78,7 +78,7 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> queryParam, final Map<String, Object> context)
public Sequence<T> run(final Query<T> queryParam, final Map<String, Object> responseContext)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
@ -111,10 +111,10 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
{
try {
if (bySegment) {
input.run(queryParam, context)
input.run(queryParam, responseContext)
.accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
} else {
input.run(queryParam, context)
input.run(queryParam, responseContext)
.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}

View File

@ -49,10 +49,10 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
if (period.getMillis() == 0) {
return baseRunner.run(query, context);
return baseRunner.run(query, responseContext);
}
return Sequences.concat(
@ -76,7 +76,7 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
{
return baseRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))),
context
responseContext
);
}
}

View File

@ -84,7 +84,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final ServiceMetricEvent.Builder builder = builderFn.apply(query);
String queryId = query.getId();
@ -102,7 +102,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
long startTime = System.currentTimeMillis();
try {
retVal = queryRunner.run(query, context).accumulate(outType, accumulator);
retVal = queryRunner.run(query, responseContext).accumulate(outType, accumulator);
}
catch (RuntimeException e) {
builder.setUser10("failed");
@ -132,7 +132,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
long startTime = System.currentTimeMillis();
try {
retVal = queryRunner.run(query, context).toYielder(initValue, accumulator);
retVal = queryRunner.run(query, responseContext).toYielder(initValue, accumulator);
}
catch (RuntimeException e) {
builder.setUser10("failed");

View File

@ -30,7 +30,7 @@ import java.util.Map;
public class NoopQueryRunner<T> implements QueryRunner<T>
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> context)
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
return Sequences.empty();
}

View File

@ -27,5 +27,5 @@ import java.util.Map;
*/
public interface QueryRunner<T>
{
public Sequence<T> run(Query<T> query, Map<String, Object> context);
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext);
}

View File

@ -45,11 +45,11 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> context)
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
final Closeable closeable = adapter.increment();
try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, context);
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, responseContext);
return new ResourceClosingSequence<T>(baseSequence, closeable);
}

View File

@ -0,0 +1,54 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
/**
*/
public class ReportTimelineMissingIntervalQueryRunner<T> implements QueryRunner<T>
{
private final Interval interval;
public ReportTimelineMissingIntervalQueryRunner(Interval interval)
{
this.interval = interval;
}
@Override
public Sequence<T> run(
Query<T> query, Map<String, Object> responseContext
)
{
List<Interval> missingIntervals = (List<Interval>) responseContext.get(Result.MISSING_INTERVALS_KEY);
if (missingIntervals == null) {
missingIntervals = Lists.newArrayList();
responseContext.put(Result.MISSING_INTERVALS_KEY, missingIntervals);
}
missingIntervals.add(interval);
return Sequences.empty();
}
}

View File

@ -0,0 +1,53 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import java.util.List;
import java.util.Map;
/**
*/
public class ReportTimelineMissingSegmentQueryRunner<T> implements QueryRunner<T>
{
private final SegmentDescriptor descriptor;
public ReportTimelineMissingSegmentQueryRunner(SegmentDescriptor descriptor)
{
this.descriptor = descriptor;
}
@Override
public Sequence<T> run(
Query<T> query, Map<String, Object> responseContext
)
{
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(descriptor);
return Sequences.empty();
}
}

View File

@ -27,6 +27,9 @@ import org.joda.time.DateTime;
*/
public class Result<T> implements Comparable<Result<T>>
{
public static String MISSING_SEGMENTS_KEY = "missingSegments";
public static String MISSING_INTERVALS_KEY = "missingIntervals";
private final DateTime timestamp;
private final T value;

View File

@ -21,6 +21,7 @@ package io.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
@ -28,15 +29,16 @@ import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
import com.metamx.emitter.EmittingLogger;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.segment.SegmentMissingException;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
public class RetryQueryRunner<T> implements QueryRunner<T>
{
public static String MISSING_SEGMENTS_KEY = "missingSegments";
private static final EmittingLogger log = new EmittingLogger(RetryQueryRunner.class);
private final QueryRunner<T> baseRunner;
@ -58,10 +60,10 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final List<Sequence<T>> listOfSequences = Lists.newArrayList();
listOfSequences.add(baseRunner.run(query, context));
listOfSequences.add(baseRunner.run(query, responseContext));
return new YieldingSequenceBase<T>()
{
@ -70,48 +72,99 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
OutType initValue, YieldingAccumulator<OutType, T> accumulator
)
{
final List<SegmentDescriptor> missingSegments = getMissingSegments(context);
if (!missingSegments.isEmpty()) {
for (int i = 0; i < config.getNumTries(); i++) {
log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i);
context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList());
final Query<T> retryQuery = query.withQuerySegmentSpec(
// Try to find missing segments
doRetryLogic(
responseContext,
Result.MISSING_SEGMENTS_KEY,
new TypeReference<List<SegmentDescriptor>>()
{
},
new Function<List<SegmentDescriptor>, Query<T>>()
{
@Override
public Query<T> apply(List<SegmentDescriptor> input)
{
return query.withQuerySegmentSpec(
new MultipleSpecificSegmentSpec(
missingSegments
input
)
);
Sequence<T> retrySequence = baseRunner.run(retryQuery, context);
listOfSequences.add(retrySequence);
if (getMissingSegments(context).isEmpty()) {
break;
}
}
},
listOfSequences
);
final List<SegmentDescriptor> finalMissingSegs = getMissingSegments(context);
if (!config.isReturnPartialResults() && !finalMissingSegs.isEmpty()) {
throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs);
}
// Try to find missing intervals
doRetryLogic(
responseContext,
Result.MISSING_INTERVALS_KEY,
new TypeReference<List<Interval>>()
{
},
new Function<List<Interval>, Query<T>>()
{
@Override
public Query<T> apply(List<Interval> input)
{
return query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(
input
)
);
}
},
listOfSequences
);
return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator);
}
};
}
private List<SegmentDescriptor> getMissingSegments(final Map<String, Object> context)
private <Type> void doRetryLogic(
final Map<String, Object> responseContext,
final String key,
final TypeReference<List<Type>> typeReference,
final Function<List<Type>, Query<T>> function,
final List<Sequence<T>> listOfSequences
)
{
final Object maybeMissingSegments = context.get(MISSING_SEGMENTS_KEY);
if (maybeMissingSegments == null) {
final List<Type> missingItems = getMissingItems(responseContext, key, typeReference);
if (!missingItems.isEmpty()) {
for (int i = 0; i < config.getNumTries(); i++) {
log.info("[%,d] missing items found. Retry attempt [%,d]", missingItems.size(), i);
responseContext.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
final Query<T> retryQuery = function.apply(missingItems);
Sequence<T> retrySequence = baseRunner.run(retryQuery, responseContext);
listOfSequences.add(retrySequence);
if (getMissingItems(responseContext, key, typeReference).isEmpty()) {
break;
}
}
final List<Type> finalMissingItems = getMissingItems(responseContext, key, typeReference);
if (!config.isReturnPartialResults() && !finalMissingItems.isEmpty()) {
throw new SegmentMissingException("No results found for items[%s]", finalMissingItems);
}
}
}
private <Type> List<Type> getMissingItems(
final Map<String, Object> context,
final String key,
final TypeReference<List<Type>> typeReference
)
{
final Object maybeMissing = context.get(key);
if (maybeMissing == null) {
return Lists.newArrayList();
}
return jsonMapper.convertValue(
maybeMissingSegments,
new TypeReference<List<SegmentDescriptor>>()
{
}
maybeMissing,
typeReference
);
}
}

View File

@ -39,13 +39,13 @@ public class SubqueryQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> context)
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) {
return run((Query<T>) ((QueryDataSource) dataSource).getQuery(), context);
return run((Query<T>) ((QueryDataSource) dataSource).getQuery(), responseContext);
} else {
return baseRunner.run(query, context);
return baseRunner.run(query, responseContext);
}
}
}

View File

@ -81,7 +81,7 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final long offset = computeOffset(now);
@ -93,7 +93,7 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
return Sequences.map(
baseRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))),
context
responseContext
),
new Function<T, T>()
{

View File

@ -44,7 +44,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
@ -59,7 +59,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
{
return baseRunner.run(
query.withDataSource(singleSource),
context
responseContext
);
}
}
@ -67,7 +67,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
)
);
} else {
return baseRunner.run(query, context);
return baseRunner.run(query, responseContext);
}
}

View File

@ -111,16 +111,18 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
return new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(Query<Row> input, Map<String, Object> context)
public Sequence<Row> run(Query<Row> input, Map<String, Object> responseContext)
{
if (input.getContextBySegment(false)) {
return runner.run(input, context);
return runner.run(input, responseContext);
}
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner, context);
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner,
responseContext
);
}
return runner.run(input, context);
return runner.run(input, responseContext);
}
};
}

View File

@ -108,7 +108,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
return new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(final Query<Row> query, final Map<String, Object> context)
public Sequence<Row> run(final Query<Row> query, final Map<String, Object> responseContext)
{
final GroupByQuery queryParam = (GroupByQuery) query;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper
@ -128,13 +128,13 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
public Void call() throws Exception
{
if (bySegment) {
input.run(queryParam, context)
input.run(queryParam, responseContext)
.accumulate(
bySegmentAccumulatorPair.lhs,
bySegmentAccumulatorPair.rhs
);
} else {
input.run(query, context)
input.run(query, responseContext)
.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
@ -203,7 +203,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
}
@Override
public Sequence<Row> run(Query<Row> input, Map<String, Object> context)
public Sequence<Row> run(Query<Row> input, Map<String, Object> responseContext)
{
if (!(input instanceof GroupByQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class);

View File

@ -77,7 +77,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
return new QueryRunner<SegmentAnalysis>()
{
@Override
public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Object> context)
public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Object> responseContext)
{
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
@ -138,7 +138,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
@Override
public Sequence<SegmentAnalysis> run(
final Query<SegmentAnalysis> query,
final Map<String, Object> context
final Map<String, Object> responseContext
)
{
final int priority = query.getContextPriority(0);
@ -148,7 +148,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
@Override
public Sequence<SegmentAnalysis> call() throws Exception
{
return input.run(query, context);
return input.run(query, responseContext);
}
}
);

View File

@ -280,7 +280,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
@Override
public Sequence<Result<SearchResultValue>> run(
Query<Result<SearchResultValue>> input,
Map<String, Object> context
Map<String, Object> responseContext
)
{
if (!(input instanceof SearchQuery)) {
@ -289,13 +289,13 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
final SearchQuery query = (SearchQuery) input;
if (query.getLimit() < config.getMaxSearchLimit()) {
return runner.run(query, context);
return runner.run(query, responseContext);
}
final boolean isBySegment = query.getContextBySegment(false);
return Sequences.map(
runner.run(query.withLimit(config.getMaxSearchLimit()), context),
runner.run(query.withLimit(config.getMaxSearchLimit()), responseContext),
new Function<Result<SearchResultValue>, Result<SearchResultValue>>()
{
@Override

View File

@ -73,7 +73,7 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
@Override
public Sequence<Result<SearchResultValue>> run(
final Query<Result<SearchResultValue>> input,
Map<String, Object> context
Map<String, Object> responseContext
)
{
if (!(input instanceof SearchQuery)) {

View File

@ -92,7 +92,7 @@ public class SelectQueryRunnerFactory
@Override
public Sequence<Result<SelectResultValue>> run(
Query<Result<SelectResultValue>> input,
Map<String, Object> context
Map<String, Object> responseContext
)
{
if (!(input instanceof SelectQuery)) {

View File

@ -27,7 +27,7 @@ import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.RetryQueryRunner;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.SegmentMissingException;
@ -53,7 +53,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> input, final Map<String, Object> context)
public Sequence<T> run(final Query<T> input, final Map<String, Object> responseContext)
{
final Query<T> query = input.withQuerySegmentSpec(specificSpec);
@ -67,7 +67,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> call() throws Exception
{
return base.run(query, context);
return base.run(query, responseContext);
}
}
);
@ -87,10 +87,10 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
return baseSequence.accumulate(initValue, accumulator);
}
catch (SegmentMissingException e) {
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY);
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, missingSegments);
responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
return initValue;

View File

@ -87,7 +87,7 @@ public class TimeBoundaryQueryRunnerFactory
@Override
public Sequence<Result<TimeBoundaryResultValue>> run(
Query<Result<TimeBoundaryResultValue>> input,
Map<String, Object> context
Map<String, Object> responseContext
)
{
if (!(input instanceof TimeBoundaryQuery)) {

View File

@ -93,7 +93,7 @@ public class TimeseriesQueryRunnerFactory
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> input,
Map<String, Object> context
Map<String, Object> responseContext
)
{
if (!(input instanceof TimeseriesQuery)) {

View File

@ -414,7 +414,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override
public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> input,
Map<String, Object> context
Map<String, Object> responseContext
)
{
if (!(input instanceof TopNQuery)) {
@ -423,13 +423,13 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
final TopNQuery query = (TopNQuery) input;
if (query.getThreshold() > minTopNThreshold) {
return runner.run(query, context);
return runner.run(query, responseContext);
}
final boolean isBySegment = query.getContextBySegment(false);
return Sequences.map(
runner.run(query.withThreshold(minTopNThreshold), context),
runner.run(query.withThreshold(minTopNThreshold), responseContext),
new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
@Override

View File

@ -67,7 +67,7 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
@Override
public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> input,
Map<String, Object> context
Map<String, Object> responseContext
)
{
if (!(input instanceof TopNQuery)) {

View File

@ -279,7 +279,7 @@ public class ChainedExecutionQueryRunnerTest
}
@Override
public Sequence<Integer> run(Query<Integer> query, Map<String, Object> context)
public Sequence<Integer> run(Query<Integer> query, Map<String, Object> responseContext)
{
hasStarted = true;
start.countDown();

View File

@ -46,14 +46,14 @@ public class RetryQueryRunnerTest
public void testRunWithMissingSegments() throws Exception
{
Map<String, Object> context = new MapMaker().makeMap();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList());
context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(Query query, Map context)
public Sequence<Result<TimeseriesResultValue>> run(Query query, Map responseContext)
{
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add(
((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor(
new Interval(
178888,
@ -70,7 +70,8 @@ public class RetryQueryRunnerTest
new RetryQueryRunnerConfig()
{
@Override
public int getNumTries() {
public int getNumTries()
{
return 0;
}
@ -90,29 +91,67 @@ public class RetryQueryRunnerTest
Assert.assertTrue(
"Should have one entry in the list of missing segments",
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 1
((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 1
);
Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0);
}
@Test
public void testRunWithMissingInterval() throws Exception
{
Map<String, Object> context = new MapMaker().makeMap();
context.put(Result.MISSING_INTERVALS_KEY, Lists.newArrayList());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new ReportTimelineMissingIntervalQueryRunner<Result<TimeseriesResultValue>>(new Interval("2013/2014")),
(QueryToolChest) new TimeseriesQueryQueryToolChest(
new QueryConfig()
),
new RetryQueryRunnerConfig()
{
@Override
public int getNumTries()
{
return 0;
}
@Override
public boolean isReturnPartialResults()
{
return true;
}
},
jsonMapper
);
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query, context),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
Assert.assertTrue(
"Should have one entry in the list of missing segments",
((List) context.get(Result.MISSING_INTERVALS_KEY)).size() == 1
);
Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0);
}
@Test
public void testRetry() throws Exception
{
Map<String, Object> context = new MapMaker().makeMap();
context.put("count", 0);
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList());
context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
Map<String, Object> context
Map<String, Object> responseContext
)
{
if ((int) context.get("count") == 0) {
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add(
if ((int) responseContext.get("count") == 0) {
((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor(
new Interval(
178888,
@ -120,7 +159,7 @@ public class RetryQueryRunnerTest
), "test", 1
)
);
context.put("count", 1);
responseContext.put("count", 1);
return Sequences.empty();
} else {
return Sequences.simple(
@ -159,7 +198,7 @@ public class RetryQueryRunnerTest
Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1);
Assert.assertTrue(
"Should have nothing in missingSegment list",
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 0
((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 0
);
}
@ -168,18 +207,18 @@ public class RetryQueryRunnerTest
{
Map<String, Object> context = new MapMaker().makeMap();
context.put("count", 0);
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList());
context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
Map<String, Object> context
Map<String, Object> responseContext
)
{
if ((int) context.get("count") < 3) {
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add(
if ((int) responseContext.get("count") < 3) {
((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor(
new Interval(
178888,
@ -187,7 +226,7 @@ public class RetryQueryRunnerTest
), "test", 1
)
);
context.put("count", (int) context.get("count") + 1);
responseContext.put("count", (int) responseContext.get("count") + 1);
return Sequences.empty();
} else {
return Sequences.simple(
@ -226,7 +265,7 @@ public class RetryQueryRunnerTest
Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1);
Assert.assertTrue(
"Should have nothing in missingSegment list",
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 0
((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 0
);
}
@ -234,17 +273,17 @@ public class RetryQueryRunnerTest
public void testException() throws Exception
{
Map<String, Object> context = new MapMaker().makeMap();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList());
context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
Map<String, Object> context
Map<String, Object> responseContext
)
{
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add(
((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor(
new Interval(
178888,
@ -260,12 +299,9 @@ public class RetryQueryRunnerTest
),
new RetryQueryRunnerConfig()
{
private int numTries = 1;
private boolean returnPartialResults = false;
public int getNumTries() { return 1; }
public int getNumTries() { return numTries; }
public boolean returnPartialResults() { return returnPartialResults; }
public boolean returnPartialResults() { return false; }
},
jsonMapper
);
@ -277,7 +313,37 @@ public class RetryQueryRunnerTest
Assert.assertTrue(
"Should have one entry in the list of missing segments",
((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 1
((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 1
);
}
@Test(expected = SegmentMissingException.class)
public void testIntervalMissingCausesException() throws Exception
{
Map<String, Object> context = new MapMaker().makeMap();
context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new ReportTimelineMissingIntervalQueryRunner<Result<TimeseriesResultValue>>(new Interval("2013/2014")),
(QueryToolChest) new TimeseriesQueryQueryToolChest(
new QueryConfig()
),
new RetryQueryRunnerConfig()
{
public int getNumTries() { return 1; }
public boolean returnPartialResults() { return false; }
},
jsonMapper
);
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query, context),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
Assert.assertTrue(
"Should have one entry in the list of missing segments",
((List) context.get(Result.MISSING_INTERVALS_KEY)).size() == 1
);
}
}

View File

@ -82,7 +82,7 @@ public class TimewarpOperatorTest
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
Map<String, Object> context
Map<String, Object> responseContext
)
{
return Sequences.simple(
@ -144,7 +144,7 @@ public class TimewarpOperatorTest
@Override
public Sequence<Result<TimeBoundaryResultValue>> run(
Query<Result<TimeBoundaryResultValue>> query,
Map<String, Object> context
Map<String, Object> responseContext
)
{
return Sequences.simple(
@ -194,7 +194,7 @@ public class TimewarpOperatorTest
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
Map<String, Object> context
Map<String, Object> responseContext
)
{
return Sequences.simple(

View File

@ -556,7 +556,7 @@ public class GroupByQueryRunnerTest
{
@Override
public Sequence<Row> run(
Query<Row> query, Map<String, Object> context
Query<Row> query, Map<String, Object> responseContext
)
{
// simulate two daily segments
@ -566,7 +566,7 @@ public class GroupByQueryRunnerTest
final Query query2 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return Sequences.concat(runner.run(query1, context), runner.run(query2, context));
return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext));
}
}
);
@ -752,7 +752,7 @@ public class GroupByQueryRunnerTest
{
@Override
public Sequence<Row> run(
Query<Row> query, Map<String, Object> context
Query<Row> query, Map<String, Object> responseContext
)
{
// simulate two daily segments
@ -762,7 +762,7 @@ public class GroupByQueryRunnerTest
final Query query2 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return Sequences.concat(runner.run(query1, context), runner.run(query2, context));
return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext));
}
}
);
@ -1109,7 +1109,7 @@ public class GroupByQueryRunnerTest
{
@Override
public Sequence<Row> run(
Query<Row> query, Map<String, Object> context
Query<Row> query, Map<String, Object> responseContext
)
{
// simulate two daily segments
@ -1119,7 +1119,7 @@ public class GroupByQueryRunnerTest
final Query query2 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return Sequences.concat(runner.run(query1, context), runner.run(query2, context));
return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext));
}
}
);

View File

@ -95,7 +95,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
QueryRunner timeseriesRunner = new QueryRunner()
{
@Override
public Sequence run(Query query, Map metadata)
public Sequence run(Query query, Map responseContext)
{
TimeseriesQuery tsQuery = (TimeseriesQuery) query;
@ -109,7 +109,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
.build(),
metadata
responseContext
),
new Function<Row, Result<TimeseriesResultValue>>()
{

View File

@ -69,7 +69,7 @@ public class SpecificSegmentQueryRunnerTest
new QueryRunner()
{
@Override
public Sequence run(Query query, Map context)
public Sequence run(Query query, Map responseContext)
{
return new Sequence()
{
@ -112,7 +112,7 @@ public class SpecificSegmentQueryRunnerTest
);
Sequences.toList(results, Lists.newArrayList());
Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY);
Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY);
Assert.assertTrue(missingSegments != null);
Assert.assertTrue(missingSegments instanceof List);
@ -149,7 +149,7 @@ public class SpecificSegmentQueryRunnerTest
new QueryRunner()
{
@Override
public Sequence run(Query query, Map context)
public Sequence run(Query query, Map responseContext)
{
return Sequences.withEffect(
Sequences.simple(Arrays.asList(value)),
@ -196,7 +196,7 @@ public class SpecificSegmentQueryRunnerTest
Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows"));
Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY);
Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY);
Assert.assertTrue(missingSegments != null);
Assert.assertTrue(missingSegments instanceof List);

View File

@ -93,7 +93,7 @@ public class TimeBoundaryQueryRunnerTest
.bound(TimeBoundaryQuery.MAX_TIME)
.build();
Map<String, Object> context = new MapMaker().makeMap();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList());
context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList(
runner.run(timeBoundaryQuery, context),
Lists.<Result<TimeBoundaryResultValue>>newArrayList()
@ -115,7 +115,7 @@ public class TimeBoundaryQueryRunnerTest
.bound(TimeBoundaryQuery.MIN_TIME)
.build();
Map<String, Object> context = new MapMaker().makeMap();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList());
context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList(
runner.run(timeBoundaryQuery, context),
Lists.<Result<TimeBoundaryResultValue>>newArrayList()

View File

@ -146,7 +146,8 @@ public class TimeSeriesUnionQueryRunnerTest
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> query,
Map<String, Object> context)
Map<String, Object> responseContext
)
{
if (query.getDataSource().equals(new TableDataSource("ds1"))) {
return Sequences.simple(

View File

@ -117,7 +117,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final CacheStrategy<T, Object, Query<T>> strategy = toolChest.getCacheStrategy(query);
@ -319,13 +319,13 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
List<Interval> intervals = segmentSpec.getIntervals();
if (!server.isAssignable() || !populateCache || isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), context);
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext);
} else {
// this could be more efficient, since we only need to reorder results
// for batches of segments with the same segment start time.
resultSeqToAdd = toolChest.mergeSequencesUnordered(
Sequences.map(
clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec), context),
clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec), responseContext),
new Function<Object, Sequence<T>>()
{
private final Function<T, Object> cacheFn = strategy.prepareForCache();

View File

@ -73,7 +73,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> context)
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
@ -143,7 +143,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
return Sequences.withEffect(
Sequences.map(
base.run(query, context),
base.run(query, responseContext),
new Function<T, T>()
{
@Override
@ -165,7 +165,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
MoreExecutors.sameThreadExecutor()
);
} else {
return base.run(query, context);
return base.run(query, responseContext);
}
}

View File

@ -87,7 +87,7 @@ public class BridgeQuerySegmentWalker implements QuerySegmentWalker
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> context)
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
try {
Server instance = brokerSelector.pick();

View File

@ -21,7 +21,6 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
@ -48,6 +47,8 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.ReferenceCountingSegmentQueryRunner;
import io.druid.query.ReportTimelineMissingIntervalQueryRunner;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.query.spec.SpecificSegmentQueryRunner;
@ -261,18 +262,22 @@ public class ServerManager implements QuerySegmentWalker
return new NoopQueryRunner<T>();
}
FunctionalIterable<QueryRunner<T>> adapters = FunctionalIterable
FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
.create(intervals)
.transformCat(
new Function<Interval, Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>>>()
new Function<Interval, Iterable<QueryRunner<T>>>()
{
@Override
public Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>> apply(Interval input)
public Iterable<QueryRunner<T>> apply(final Interval interval)
{
return timeline.lookup(input);
Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>> holders = timeline.lookup(interval);
if (holders == null) {
return Arrays.<QueryRunner<T>>asList(new ReportTimelineMissingIntervalQueryRunner<T>(interval));
}
}
)
return FunctionalIterable
.create(holders)
.transformCat(
new Function<TimelineObjectHolder<String, ReferenceCountingSegment>, Iterable<QueryRunner<T>>>()
{
@ -282,7 +287,11 @@ public class ServerManager implements QuerySegmentWalker
)
{
if (holder == null) {
return null;
return Arrays.<QueryRunner<T>>asList(
new ReportTimelineMissingIntervalQueryRunner<T>(
interval
)
);
}
return FunctionalIterable
@ -306,17 +315,18 @@ public class ServerManager implements QuerySegmentWalker
);
}
}
)
.filter(Predicates.<QueryRunner<T>>notNull());
);
}
}
);
}
}
)
.filter(
Predicates.<QueryRunner<T>>notNull()
);
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest);
return new FinalizeResultsQueryRunner<T>(
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
);
}
private String getDataSourceName(DataSource dataSource)
@ -345,7 +355,7 @@ public class ServerManager implements QuerySegmentWalker
return new NoopQueryRunner<T>();
}
FunctionalIterable<QueryRunner<T>> adapters = FunctionalIterable
FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
.create(specs)
.transformCat(
new Function<SegmentDescriptor, Iterable<QueryRunner<T>>>()
@ -359,12 +369,12 @@ public class ServerManager implements QuerySegmentWalker
);
if (entry == null) {
return null;
return Arrays.<QueryRunner<T>>asList(new ReportTimelineMissingSegmentQueryRunner<T>(input));
}
final PartitionChunk<ReferenceCountingSegment> chunk = entry.getChunk(input.getPartitionNumber());
if (chunk == null) {
return null;
return Arrays.<QueryRunner<T>>asList(new ReportTimelineMissingSegmentQueryRunner<T>(input));
}
final ReferenceCountingSegment adapter = chunk.getObject();
@ -373,12 +383,12 @@ public class ServerManager implements QuerySegmentWalker
);
}
}
)
.filter(
Predicates.<QueryRunner<T>>notNull()
);
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest);
return new FinalizeResultsQueryRunner<T>(
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
);
}
private <T> QueryRunner<T> buildAndDecorateQueryRunner(

View File

@ -124,7 +124,7 @@ public class CachingQueryRunnerTest
new QueryRunner()
{
@Override
public Sequence run(Query query, Map context)
public Sequence run(Query query, Map responseContext)
{
return resultSeq;
}
@ -214,7 +214,7 @@ public class CachingQueryRunnerTest
new QueryRunner()
{
@Override
public Sequence run(Query query, Map context)
public Sequence run(Query query, Map responseContext)
{
return Sequences.empty();
}

View File

@ -685,9 +685,9 @@ public class ServerManagerTest
}
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> context)
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
return new BlockingSequence<T>(runner.run(query, context), waitLatch, waitYieldLatch, notifyLatch);
return new BlockingSequence<T>(runner.run(query, responseContext), waitLatch, waitYieldLatch, notifyLatch);
}
}