fix retry logic and change the default value of retry to 0

This commit is contained in:
jisookim0513 2014-06-19 17:50:50 -07:00
parent ff980091c5
commit bdb35e2d7e
3 changed files with 41 additions and 30 deletions

View File

@ -20,11 +20,11 @@
package io.druid.query;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.spec.MultipleSpecificSegmentSpec;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
import io.druid.segment.SegmentMissingException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -35,7 +35,11 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
private final QueryToolChest<T, Query<T>> toolChest;
private final RetryQueryRunnerConfig config;
public RetryQueryRunner(QueryRunner<T> baseRunner, QueryToolChest<T, Query<T>> toolChest, RetryQueryRunnerConfig config)
public RetryQueryRunner(
QueryRunner<T> baseRunner,
QueryToolChest<T, Query<T>> toolChest,
RetryQueryRunnerConfig config
)
{
this.baseRunner = baseRunner;
this.toolChest = toolChest;
@ -43,36 +47,42 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> context)
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{
Sequence<T> returningSeq = baseRunner.run(query, context);
final Sequence<T> returningSeq = baseRunner.run(query, context);
return new YieldingSequenceBase<T>()
{
@Override
public <OutType> Yielder<OutType> toYielder(
OutType initValue, YieldingAccumulator<OutType, T> accumulator
)
{
Yielder<OutType> yielder = returningSeq.toYielder(initValue, accumulator);
for (int i = config.numTries(); i > 0 && !((List)context.get(missingSegments)).isEmpty(); i--) {
List<SegmentDescriptor> segList= (List<SegmentDescriptor>)context.get(missingSegments);
((List)context.get(missingSegments)).clear();
returningSeq = toolChest.mergeSequences(
Sequences.simple(
Arrays.asList(
returningSeq,
baseRunner.run(
query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(segList)),
context
)
)
)
);
}
if (((List) context.get(missingSegments)).isEmpty()) {
return yielder;
}
if (!config.returnPartialResults() && !((List)context.get(missingSegments)).isEmpty()) {
String failedSegments = "";
for (SegmentDescriptor segment : (List<SegmentDescriptor>) context.get("missingSegments")) {
failedSegments = failedSegments + segment.toString() + " ";
for (int i = config.numTries(); i > 0 && !((List) context.get(missingSegments)).isEmpty(); i--) {
((List) context.get(missingSegments)).clear();
yielder = baseRunner.run(query, context).toYielder(initValue, accumulator);
if (((List) context.get(missingSegments)).isEmpty()) {
break;
}
}
if (!config.returnPartialResults() && !((List) context.get(missingSegments)).isEmpty()) {
String failedSegments = "";
for (SegmentDescriptor segment : (List<SegmentDescriptor>) context.get("missingSegments")) {
failedSegments = failedSegments + segment.toString() + " ";
}
throw new SegmentMissingException("The following segments are missing: " + failedSegments);
}
return yielder;
}
throw new SegmentMissingException("The following segments are missing: " + failedSegments);
}
return returningSeq;
};
}
}

View File

@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class RetryQueryRunnerConfig
{
@JsonProperty
private int numTries = 1;
private int numTries = 0;
private boolean returnPartialResults = false;
public int numTries() { return numTries; }

View File

@ -253,6 +253,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
addSequencesFromServer(listOfSequences);
addSequencesFromCache(listOfSequences);
Collections.sort(
listOfSequences,
Ordering.natural().onResultOf(Pair.<DateTime, Sequence<T>>lhsFn())