Fix a couple of bugs with retry query runner after testing it locally

This commit is contained in:
fjy 2014-11-13 11:53:29 -08:00
parent d4ca805cb9
commit 0d6816a037
4 changed files with 21 additions and 22 deletions

View File

@ -27,6 +27,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder; import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase; import com.metamx.common.guava.YieldingSequenceBase;
import com.metamx.emitter.EmittingLogger;
import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.segment.SegmentMissingException; import io.druid.segment.SegmentMissingException;
@ -36,6 +37,7 @@ import java.util.Map;
public class RetryQueryRunner<T> implements QueryRunner<T> public class RetryQueryRunner<T> implements QueryRunner<T>
{ {
public static String MISSING_SEGMENTS_KEY = "missingSegments"; public static String MISSING_SEGMENTS_KEY = "missingSegments";
private static final EmittingLogger log = new EmittingLogger(RetryQueryRunner.class);
private final QueryRunner<T> baseRunner; private final QueryRunner<T> baseRunner;
private final RetryQueryRunnerConfig config; private final RetryQueryRunnerConfig config;
@ -73,6 +75,8 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
} }
for (int i = 0; i < config.numTries(); i++) { for (int i = 0; i < config.numTries(); i++) {
log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i);
context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList()); context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList());
final Query<T> retryQuery = query.withQuerySegmentSpec( final Query<T> retryQuery = query.withQuerySegmentSpec(
new MultipleSpecificSegmentSpec( new MultipleSpecificSegmentSpec(

View File

@ -23,7 +23,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder; import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.guava.YieldingAccumulator;
import io.druid.query.Query; import io.druid.query.Query;
@ -68,20 +67,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> call() throws Exception public Sequence<T> call() throws Exception
{ {
Sequence<T> returningSeq; return base.run(query, context);
try {
returningSeq = base.run(query, context);
}
catch (SegmentMissingException e) {
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
returningSeq = Sequences.empty();
}
return returningSeq;
} }
} }
); );
@ -97,7 +83,18 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
@Override @Override
public OutType call() throws Exception public OutType call() throws Exception
{ {
return baseSequence.accumulate(initValue, accumulator); try {
return baseSequence.accumulate(initValue, accumulator);
}
catch (SegmentMissingException e) {
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
return initValue;
}
} }
} }
); );

View File

@ -36,8 +36,6 @@ import java.util.List;
*/ */
public class TimeseriesQueryEngine public class TimeseriesQueryEngine
{ {
private static final int AGG_UNROLL_COUNT = 8;
public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter) public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter)
{ {
if (adapter == null) { if (adapter == null) {
@ -61,7 +59,7 @@ public class TimeseriesQueryEngine
Aggregator[] aggregators = QueryRunnerHelper.makeAggregators(cursor, aggregatorSpecs); Aggregator[] aggregators = QueryRunnerHelper.makeAggregators(cursor, aggregatorSpecs);
try { try {
while (!cursor.isDone()) { while (!cursor.isDone()) {
for(Aggregator aggregator : aggregators) { for (Aggregator aggregator : aggregators) {
aggregator.aggregate(); aggregator.aggregate();
} }
cursor.advance(); cursor.advance();

View File

@ -153,8 +153,8 @@ public class QueryResource
log.debug("Got query [%s]", query); log.debug("Got query [%s]", query);
} }
final Map<String, Object> context = new MapMaker().makeMap(); final Map<String, Object> responseContext = new MapMaker().makeMap();
final Sequence res = query.run(texasRanger, context); final Sequence res = query.run(texasRanger, responseContext);
final Sequence results; final Sequence results;
if (res == null) { if (res == null) {
results = Sequences.empty(); results = Sequences.empty();
@ -212,7 +212,7 @@ public class QueryResource
isSmile ? APPLICATION_JSON : APPLICATION_SMILE isSmile ? APPLICATION_JSON : APPLICATION_SMILE
) )
.header("X-Druid-Query-Id", queryId) .header("X-Druid-Query-Id", queryId)
.header("X-Druid-Response-Context", jsonMapper.writeValueAsString(context)) .header("X-Druid-Response-Context", jsonMapper.writeValueAsString(responseContext))
.build(); .build();
} }
catch (Exception e) { catch (Exception e) {