mirror of https://github.com/apache/druid.git
fix retry to actually return correct sequences
This commit is contained in:
parent
6cdd6a6af7
commit
bbc079b880
|
@ -22,8 +22,8 @@ package io.druid.query;
|
|||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.guava.Yielder;
|
||||
import com.metamx.common.guava.YieldingAccumulator;
|
||||
import com.metamx.common.guava.YieldingSequenceBase;
|
||||
|
@ -40,16 +40,19 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
|
|||
private static final EmittingLogger log = new EmittingLogger(RetryQueryRunner.class);
|
||||
|
||||
private final QueryRunner<T> baseRunner;
|
||||
private final QueryToolChest<T, Query<T>> toolChest;
|
||||
private final RetryQueryRunnerConfig config;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
public RetryQueryRunner(
|
||||
QueryRunner<T> baseRunner,
|
||||
QueryToolChest<T, Query<T>> toolChest,
|
||||
RetryQueryRunnerConfig config,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.baseRunner = baseRunner;
|
||||
this.toolChest = toolChest;
|
||||
this.config = config;
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
@ -57,7 +60,9 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
|
|||
@Override
|
||||
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
|
||||
{
|
||||
final List<Sequence<T>> listOfSequences = Lists.newArrayList();
|
||||
final Sequence<T> returningSeq = baseRunner.run(query, context);
|
||||
listOfSequences.add(returningSeq);
|
||||
|
||||
return new YieldingSequenceBase<T>()
|
||||
{
|
||||
|
@ -66,12 +71,10 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
|
|||
OutType initValue, YieldingAccumulator<OutType, T> accumulator
|
||||
)
|
||||
{
|
||||
Yielder<OutType> yielder = returningSeq.toYielder(initValue, accumulator);
|
||||
|
||||
final List<SegmentDescriptor> missingSegments = getMissingSegments(context);
|
||||
|
||||
if (missingSegments.isEmpty()) {
|
||||
return yielder;
|
||||
return returningSeq.toYielder(initValue, accumulator);
|
||||
}
|
||||
|
||||
for (int i = 0; i < config.numTries(); i++) {
|
||||
|
@ -83,7 +86,8 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
|
|||
missingSegments
|
||||
)
|
||||
);
|
||||
yielder = baseRunner.run(retryQuery, context).toYielder(initValue, accumulator);
|
||||
Sequence<T> retrySequence = baseRunner.run(retryQuery, context);
|
||||
listOfSequences.add(retrySequence);
|
||||
if (getMissingSegments(context).isEmpty()) {
|
||||
break;
|
||||
}
|
||||
|
@ -94,7 +98,7 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
|
|||
throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs);
|
||||
}
|
||||
|
||||
return yielder;
|
||||
return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import com.metamx.common.guava.Sequences;
|
|||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.timeseries.TimeseriesQuery;
|
||||
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
|
||||
import io.druid.query.timeseries.TimeseriesResultValue;
|
||||
import io.druid.segment.SegmentMissingException;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -63,6 +64,9 @@ public class RetryQueryRunnerTest
|
|||
return Sequences.empty();
|
||||
}
|
||||
},
|
||||
(QueryToolChest) new TimeseriesQueryQueryToolChest(
|
||||
new QueryConfig()
|
||||
),
|
||||
new RetryQueryRunnerConfig()
|
||||
{
|
||||
private int numTries = 0;
|
||||
|
@ -128,6 +132,9 @@ public class RetryQueryRunnerTest
|
|||
}
|
||||
}
|
||||
},
|
||||
(QueryToolChest) new TimeseriesQueryQueryToolChest(
|
||||
new QueryConfig()
|
||||
),
|
||||
new RetryQueryRunnerConfig()
|
||||
{
|
||||
private int numTries = 1;
|
||||
|
@ -192,6 +199,9 @@ public class RetryQueryRunnerTest
|
|||
}
|
||||
}
|
||||
},
|
||||
(QueryToolChest) new TimeseriesQueryQueryToolChest(
|
||||
new QueryConfig()
|
||||
),
|
||||
new RetryQueryRunnerConfig()
|
||||
{
|
||||
private int numTries = 4;
|
||||
|
@ -241,6 +251,9 @@ public class RetryQueryRunnerTest
|
|||
return Sequences.empty();
|
||||
}
|
||||
},
|
||||
(QueryToolChest) new TimeseriesQueryQueryToolChest(
|
||||
new QueryConfig()
|
||||
),
|
||||
new RetryQueryRunnerConfig()
|
||||
{
|
||||
private int numTries = 1;
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
|
@ -33,22 +34,27 @@ import io.druid.jackson.DefaultObjectMapper;
|
|||
import io.druid.query.Druids;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.RetryQueryRunner;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregator;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.timeseries.TimeseriesQuery;
|
||||
import io.druid.query.timeseries.TimeseriesResultBuilder;
|
||||
import io.druid.query.timeseries.TimeseriesResultValue;
|
||||
import io.druid.segment.SegmentMissingException;
|
||||
import junit.framework.Assert;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class SpecificSegmentQueryRunnerTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testRetry() throws Exception
|
||||
{
|
||||
|
@ -119,4 +125,88 @@ public class SpecificSegmentQueryRunnerTest
|
|||
|
||||
Assert.assertEquals(descriptor, newDesc);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testRetry2() throws Exception
|
||||
{
|
||||
final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
SegmentDescriptor descriptor = new SegmentDescriptor(
|
||||
new Interval("2012-01-01T00:00:00Z/P1D"),
|
||||
"version",
|
||||
0
|
||||
);
|
||||
|
||||
TimeseriesResultBuilder builder = new TimeseriesResultBuilder(
|
||||
new DateTime("2012-01-01T00:00:00Z")
|
||||
);
|
||||
CountAggregator rows = new CountAggregator("rows");
|
||||
rows.aggregate();
|
||||
builder.addMetric(rows);
|
||||
final Result<TimeseriesResultValue> value = builder.build();
|
||||
|
||||
final SpecificSegmentQueryRunner queryRunner = new SpecificSegmentQueryRunner(
|
||||
new QueryRunner()
|
||||
{
|
||||
@Override
|
||||
public Sequence run(Query query, Map context)
|
||||
{
|
||||
return Sequences.withEffect(
|
||||
Sequences.simple(Arrays.asList(value)),
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
throw new SegmentMissingException("FAILSAUCE");
|
||||
}
|
||||
},
|
||||
MoreExecutors.sameThreadExecutor()
|
||||
);
|
||||
}
|
||||
},
|
||||
new SpecificSegmentSpec(
|
||||
descriptor
|
||||
)
|
||||
);
|
||||
|
||||
final Map<String, Object> responseContext = Maps.newHashMap();
|
||||
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("foo")
|
||||
.granularity(QueryGranularity.ALL)
|
||||
.intervals(ImmutableList.of(new Interval("2012-01-01T00:00:00Z/P1D")))
|
||||
.aggregators(
|
||||
ImmutableList.<AggregatorFactory>of(
|
||||
new CountAggregatorFactory("rows")
|
||||
)
|
||||
)
|
||||
.build();
|
||||
Sequence results = queryRunner.run(
|
||||
query,
|
||||
responseContext
|
||||
);
|
||||
List<Result<TimeseriesResultValue>> res = Sequences.toList(
|
||||
results,
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, res.size());
|
||||
|
||||
Result<TimeseriesResultValue> theVal = res.get(0);
|
||||
|
||||
Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows"));
|
||||
|
||||
Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY);
|
||||
|
||||
Assert.assertTrue(missingSegments != null);
|
||||
Assert.assertTrue(missingSegments instanceof List);
|
||||
|
||||
Object segmentDesc = ((List) missingSegments).get(0);
|
||||
|
||||
Assert.assertTrue(segmentDesc instanceof SegmentDescriptor);
|
||||
|
||||
SegmentDescriptor newDesc = mapper.readValue(mapper.writeValueAsString(segmentDesc), SegmentDescriptor.class);
|
||||
|
||||
Assert.assertEquals(descriptor, newDesc);
|
||||
}
|
||||
}
|
|
@ -101,6 +101,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||
toolChest.preMergeQueryDecoration(
|
||||
new RetryQueryRunner<T>(
|
||||
baseClient,
|
||||
toolChest,
|
||||
retryConfig,
|
||||
objectMapper
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue