Merge pull request #865 from metamx/fix-retry-qr

Fix a couple of bugs with retry query runner after testing it locally
This commit is contained in:
xvrl 2014-11-14 13:33:51 -08:00
commit e1e171ad20
8 changed files with 276 additions and 50 deletions

View File

@ -22,11 +22,12 @@ package io.druid.query;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder; import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase; import com.metamx.common.guava.YieldingSequenceBase;
import com.metamx.emitter.EmittingLogger;
import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.segment.SegmentMissingException; import io.druid.segment.SegmentMissingException;
@ -36,18 +37,22 @@ import java.util.Map;
public class RetryQueryRunner<T> implements QueryRunner<T> public class RetryQueryRunner<T> implements QueryRunner<T>
{ {
public static String MISSING_SEGMENTS_KEY = "missingSegments"; public static String MISSING_SEGMENTS_KEY = "missingSegments";
private static final EmittingLogger log = new EmittingLogger(RetryQueryRunner.class);
private final QueryRunner<T> baseRunner; private final QueryRunner<T> baseRunner;
private final QueryToolChest<T, Query<T>> toolChest;
private final RetryQueryRunnerConfig config; private final RetryQueryRunnerConfig config;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
public RetryQueryRunner( public RetryQueryRunner(
QueryRunner<T> baseRunner, QueryRunner<T> baseRunner,
QueryToolChest<T, Query<T>> toolChest,
RetryQueryRunnerConfig config, RetryQueryRunnerConfig config,
ObjectMapper jsonMapper ObjectMapper jsonMapper
) )
{ {
this.baseRunner = baseRunner; this.baseRunner = baseRunner;
this.toolChest = toolChest;
this.config = config; this.config = config;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
} }
@ -55,7 +60,8 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context) public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{ {
final Sequence<T> returningSeq = baseRunner.run(query, context); final List<Sequence<T>> listOfSequences = Lists.newArrayList();
listOfSequences.add(baseRunner.run(query, context));
return new YieldingSequenceBase<T>() return new YieldingSequenceBase<T>()
{ {
@ -64,33 +70,32 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
OutType initValue, YieldingAccumulator<OutType, T> accumulator OutType initValue, YieldingAccumulator<OutType, T> accumulator
) )
{ {
Yielder<OutType> yielder = returningSeq.toYielder(initValue, accumulator);
final List<SegmentDescriptor> missingSegments = getMissingSegments(context); final List<SegmentDescriptor> missingSegments = getMissingSegments(context);
if (missingSegments.isEmpty()) { if (!missingSegments.isEmpty()) {
return yielder;
}
for (int i = 0; i < config.numTries(); i++) { for (int i = 0; i < config.numTries(); i++) {
log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i);
context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList()); context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList());
final Query<T> retryQuery = query.withQuerySegmentSpec( final Query<T> retryQuery = query.withQuerySegmentSpec(
new MultipleSpecificSegmentSpec( new MultipleSpecificSegmentSpec(
missingSegments missingSegments
) )
); );
yielder = baseRunner.run(retryQuery, context).toYielder(initValue, accumulator); Sequence<T> retrySequence = baseRunner.run(retryQuery, context);
listOfSequences.add(retrySequence);
if (getMissingSegments(context).isEmpty()) { if (getMissingSegments(context).isEmpty()) {
break; break;
} }
} }
final List<SegmentDescriptor> finalMissingSegs = getMissingSegments(context); final List<SegmentDescriptor> finalMissingSegs = getMissingSegments(context);
if (!config.returnPartialResults() && !finalMissingSegs.isEmpty()) { if (!finalMissingSegs.isEmpty()) {
throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs); throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs);
} }
}
return yielder; return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator);
} }
}; };
} }

View File

@ -24,10 +24,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class RetryQueryRunnerConfig public class RetryQueryRunnerConfig
{ {
@JsonProperty @JsonProperty
private int numTries = 0; private int numTries = 1;
@JsonProperty
private boolean returnPartialResults = false;
public int numTries() { return numTries; } public int numTries()
public boolean returnPartialResults() { return returnPartialResults; } {
return numTries;
}
} }

View File

@ -23,7 +23,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder; import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.guava.YieldingAccumulator;
import io.druid.query.Query; import io.druid.query.Query;
@ -68,20 +67,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> call() throws Exception public Sequence<T> call() throws Exception
{ {
Sequence<T> returningSeq; return base.run(query, context);
try {
returningSeq = base.run(query, context);
}
catch (SegmentMissingException e) {
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
returningSeq = Sequences.empty();
}
return returningSeq;
} }
} }
); );
@ -97,8 +83,19 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
@Override @Override
public OutType call() throws Exception public OutType call() throws Exception
{ {
try {
return baseSequence.accumulate(initValue, accumulator); return baseSequence.accumulate(initValue, accumulator);
} }
catch (SegmentMissingException e) {
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
return initValue;
}
}
} }
); );
} }

View File

@ -36,8 +36,6 @@ import java.util.List;
*/ */
public class TimeseriesQueryEngine public class TimeseriesQueryEngine
{ {
private static final int AGG_UNROLL_COUNT = 8;
public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter) public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter)
{ {
if (adapter == null) { if (adapter == null) {

View File

@ -9,6 +9,7 @@ import com.metamx.common.guava.Sequences;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.SegmentMissingException; import io.druid.segment.SegmentMissingException;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -63,6 +64,9 @@ public class RetryQueryRunnerTest
return Sequences.empty(); return Sequences.empty();
} }
}, },
(QueryToolChest) new TimeseriesQueryQueryToolChest(
new QueryConfig()
),
new RetryQueryRunnerConfig() new RetryQueryRunnerConfig()
{ {
private int numTries = 0; private int numTries = 0;
@ -128,6 +132,9 @@ public class RetryQueryRunnerTest
} }
} }
}, },
(QueryToolChest) new TimeseriesQueryQueryToolChest(
new QueryConfig()
),
new RetryQueryRunnerConfig() new RetryQueryRunnerConfig()
{ {
private int numTries = 1; private int numTries = 1;
@ -192,6 +199,9 @@ public class RetryQueryRunnerTest
} }
} }
}, },
(QueryToolChest) new TimeseriesQueryQueryToolChest(
new QueryConfig()
),
new RetryQueryRunnerConfig() new RetryQueryRunnerConfig()
{ {
private int numTries = 4; private int numTries = 4;
@ -241,6 +251,9 @@ public class RetryQueryRunnerTest
return Sequences.empty(); return Sequences.empty();
} }
}, },
(QueryToolChest) new TimeseriesQueryQueryToolChest(
new QueryConfig()
),
new RetryQueryRunnerConfig() new RetryQueryRunnerConfig()
{ {
private int numTries = 1; private int numTries = 1;

View File

@ -0,0 +1,212 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.spec;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.Result;
import io.druid.query.RetryQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregator;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesResultBuilder;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.SegmentMissingException;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class SpecificSegmentQueryRunnerTest
{
@Test
public void testRetry() throws Exception
{
final ObjectMapper mapper = new DefaultObjectMapper();
SegmentDescriptor descriptor = new SegmentDescriptor(
new Interval("2012-01-01T00:00:00Z/P1D"),
"version",
0
);
final SpecificSegmentQueryRunner queryRunner = new SpecificSegmentQueryRunner(
new QueryRunner()
{
@Override
public Sequence run(Query query, Map context)
{
return new Sequence()
{
@Override
public Object accumulate(Object initValue, Accumulator accumulator)
{
throw new SegmentMissingException("FAILSAUCE");
}
@Override
public Yielder<Object> toYielder(
Object initValue, YieldingAccumulator accumulator
)
{
return null;
}
};
}
},
new SpecificSegmentSpec(
descriptor
)
);
final Map<String, Object> responseContext = Maps.newHashMap();
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
.granularity(QueryGranularity.ALL)
.intervals(ImmutableList.of(new Interval("2012-01-01T00:00:00Z/P1D")))
.aggregators(
ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory("rows")
)
)
.build();
Sequence results = queryRunner.run(
query,
responseContext
);
Sequences.toList(results, Lists.newArrayList());
Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY);
Assert.assertTrue(missingSegments != null);
Assert.assertTrue(missingSegments instanceof List);
Object segmentDesc = ((List) missingSegments).get(0);
Assert.assertTrue(segmentDesc instanceof SegmentDescriptor);
SegmentDescriptor newDesc = mapper.readValue(mapper.writeValueAsString(segmentDesc), SegmentDescriptor.class);
Assert.assertEquals(descriptor, newDesc);
}
@SuppressWarnings("unchecked")
@Test
public void testRetry2() throws Exception
{
final ObjectMapper mapper = new DefaultObjectMapper();
SegmentDescriptor descriptor = new SegmentDescriptor(
new Interval("2012-01-01T00:00:00Z/P1D"),
"version",
0
);
TimeseriesResultBuilder builder = new TimeseriesResultBuilder(
new DateTime("2012-01-01T00:00:00Z")
);
CountAggregator rows = new CountAggregator("rows");
rows.aggregate();
builder.addMetric(rows);
final Result<TimeseriesResultValue> value = builder.build();
final SpecificSegmentQueryRunner queryRunner = new SpecificSegmentQueryRunner(
new QueryRunner()
{
@Override
public Sequence run(Query query, Map context)
{
return Sequences.withEffect(
Sequences.simple(Arrays.asList(value)),
new Runnable()
{
@Override
public void run()
{
throw new SegmentMissingException("FAILSAUCE");
}
},
MoreExecutors.sameThreadExecutor()
);
}
},
new SpecificSegmentSpec(
descriptor
)
);
final Map<String, Object> responseContext = Maps.newHashMap();
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
.granularity(QueryGranularity.ALL)
.intervals(ImmutableList.of(new Interval("2012-01-01T00:00:00Z/P1D")))
.aggregators(
ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory("rows")
)
)
.build();
Sequence results = queryRunner.run(
query,
responseContext
);
List<Result<TimeseriesResultValue>> res = Sequences.toList(
results,
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
Assert.assertEquals(1, res.size());
Result<TimeseriesResultValue> theVal = res.get(0);
Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows"));
Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY);
Assert.assertTrue(missingSegments != null);
Assert.assertTrue(missingSegments instanceof List);
Object segmentDesc = ((List) missingSegments).get(0);
Assert.assertTrue(segmentDesc instanceof SegmentDescriptor);
SegmentDescriptor newDesc = mapper.readValue(mapper.writeValueAsString(segmentDesc), SegmentDescriptor.class);
Assert.assertEquals(descriptor, newDesc);
}
}

View File

@ -101,6 +101,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
toolChest.preMergeQueryDecoration( toolChest.preMergeQueryDecoration(
new RetryQueryRunner<T>( new RetryQueryRunner<T>(
baseClient, baseClient,
toolChest,
retryConfig, retryConfig,
objectMapper objectMapper
) )

View File

@ -153,8 +153,8 @@ public class QueryResource
log.debug("Got query [%s]", query); log.debug("Got query [%s]", query);
} }
final Map<String, Object> context = new MapMaker().makeMap(); final Map<String, Object> responseContext = new MapMaker().makeMap();
final Sequence res = query.run(texasRanger, context); final Sequence res = query.run(texasRanger, responseContext);
final Sequence results; final Sequence results;
if (res == null) { if (res == null) {
results = Sequences.empty(); results = Sequences.empty();
@ -212,7 +212,7 @@ public class QueryResource
isSmile ? APPLICATION_JSON : APPLICATION_SMILE isSmile ? APPLICATION_JSON : APPLICATION_SMILE
) )
.header("X-Druid-Query-Id", queryId) .header("X-Druid-Query-Id", queryId)
.header("X-Druid-Response-Context", jsonMapper.writeValueAsString(context)) .header("X-Druid-Response-Context", jsonMapper.writeValueAsString(responseContext))
.build(); .build();
} }
catch (Exception e) { catch (Exception e) {