fix retry interval is stupid

This commit is contained in:
fjy 2014-11-20 12:50:56 -08:00
parent 3d9d989a9f
commit 47f5c1bd0a
4 changed files with 83 additions and 266 deletions

View File

@ -1,54 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
/**
*/
public class ReportTimelineMissingIntervalQueryRunner<T> implements QueryRunner<T>
{
private final Interval interval;
public ReportTimelineMissingIntervalQueryRunner(Interval interval)
{
this.interval = interval;
}
@Override
public Sequence<T> run(
Query<T> query, Map<String, Object> responseContext
)
{
List<Interval> missingIntervals = (List<Interval>) responseContext.get(Result.MISSING_INTERVALS_KEY);
if (missingIntervals == null) {
missingIntervals = Lists.newArrayList();
responseContext.put(Result.MISSING_INTERVALS_KEY, missingIntervals);
}
missingIntervals.add(interval);
return Sequences.empty();
}
}

View File

@ -21,7 +21,6 @@ package io.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
@ -29,10 +28,8 @@ import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
import com.metamx.emitter.EmittingLogger;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.segment.SegmentMissingException;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
@ -60,10 +57,10 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{
final List<Sequence<T>> listOfSequences = Lists.newArrayList();
listOfSequences.add(baseRunner.run(query, responseContext));
listOfSequences.add(baseRunner.run(query, context));
return new YieldingSequenceBase<T>()
{
@ -72,100 +69,48 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
OutType initValue, YieldingAccumulator<OutType, T> accumulator
)
{
// Try to find missing segments
doRetryLogic(
responseContext,
Result.MISSING_SEGMENTS_KEY,
new TypeReference<List<SegmentDescriptor>>()
{
},
new Function<List<SegmentDescriptor>, Query<T>>()
{
@Override
public Query<T> apply(List<SegmentDescriptor> input)
{
return query.withQuerySegmentSpec(
new MultipleSpecificSegmentSpec(
input
)
);
}
},
listOfSequences
);
final List<SegmentDescriptor> missingSegments = getMissingSegments(context);
// Try to find missing intervals
doRetryLogic(
responseContext,
Result.MISSING_INTERVALS_KEY,
new TypeReference<List<Interval>>()
{
},
new Function<List<Interval>, Query<T>>()
{
@Override
public Query<T> apply(List<Interval> input)
{
return query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(
input
)
);
}
},
listOfSequences
);
if (!missingSegments.isEmpty()) {
for (int i = 0; i < config.getNumTries(); i++) {
log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i);
context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
final Query<T> retryQuery = query.withQuerySegmentSpec(
new MultipleSpecificSegmentSpec(
missingSegments
)
);
Sequence<T> retrySequence = baseRunner.run(retryQuery, context);
listOfSequences.add(retrySequence);
if (getMissingSegments(context).isEmpty()) {
break;
}
}
final List<SegmentDescriptor> finalMissingSegs = getMissingSegments(context);
if (!config.isReturnPartialResults() && !finalMissingSegs.isEmpty()) {
throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs);
}
}
return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator);
}
};
}
private <Type> void doRetryLogic(
final Map<String, Object> responseContext,
final String key,
final TypeReference<List<Type>> typeReference,
final Function<List<Type>, Query<T>> function,
final List<Sequence<T>> listOfSequences
)
private List<SegmentDescriptor> getMissingSegments(final Map<String, Object> context)
{
final List<Type> missingItems = getMissingItems(responseContext, key, typeReference);
if (!missingItems.isEmpty()) {
for (int i = 0; i < config.getNumTries(); i++) {
log.info("[%,d] missing items found. Retry attempt [%,d]", missingItems.size(), i);
responseContext.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
final Query<T> retryQuery = function.apply(missingItems);
Sequence<T> retrySequence = baseRunner.run(retryQuery, responseContext);
listOfSequences.add(retrySequence);
if (getMissingItems(responseContext, key, typeReference).isEmpty()) {
break;
}
}
final List<Type> finalMissingItems = getMissingItems(responseContext, key, typeReference);
if (!config.isReturnPartialResults() && !finalMissingItems.isEmpty()) {
throw new SegmentMissingException("No results found for items[%s]", finalMissingItems);
}
}
}
private <Type> List<Type> getMissingItems(
final Map<String, Object> context,
final String key,
final TypeReference<List<Type>> typeReference
)
{
final Object maybeMissing = context.get(key);
if (maybeMissing == null) {
final Object maybeMissingSegments = context.get(Result.MISSING_SEGMENTS_KEY);
if (maybeMissingSegments == null) {
return Lists.newArrayList();
}
return jsonMapper.convertValue(
maybeMissing,
typeReference
maybeMissingSegments,
new TypeReference<List<SegmentDescriptor>>()
{
}
);
}
}

View File

@ -51,9 +51,9 @@ public class RetryQueryRunnerTest
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(Query query, Map responseContext)
public Sequence<Result<TimeseriesResultValue>> run(Query query, Map context)
{
((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add(
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor(
new Interval(
178888,
@ -96,44 +96,6 @@ public class RetryQueryRunnerTest
Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0);
}
@Test
public void testRunWithMissingInterval() throws Exception
{
Map<String, Object> context = new MapMaker().makeMap();
context.put(Result.MISSING_INTERVALS_KEY, Lists.newArrayList());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new ReportTimelineMissingIntervalQueryRunner<Result<TimeseriesResultValue>>(new Interval("2013/2014")),
(QueryToolChest) new TimeseriesQueryQueryToolChest(
new QueryConfig()
),
new RetryQueryRunnerConfig()
{
@Override
public int getNumTries()
{
return 0;
}
@Override
public boolean isReturnPartialResults()
{
return true;
}
},
jsonMapper
);
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query, context),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
Assert.assertTrue(
"Should have one entry in the list of missing segments",
((List) context.get(Result.MISSING_INTERVALS_KEY)).size() == 1
);
Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0);
}
@Test
public void testRetry() throws Exception
@ -147,11 +109,11 @@ public class RetryQueryRunnerTest
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
Map<String, Object> responseContext
Map<String, Object> context
)
{
if ((int) responseContext.get("count") == 0) {
((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add(
if ((int) context.get("count") == 0) {
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor(
new Interval(
178888,
@ -159,7 +121,7 @@ public class RetryQueryRunnerTest
), "test", 1
)
);
responseContext.put("count", 1);
context.put("count", 1);
return Sequences.empty();
} else {
return Sequences.simple(
@ -214,11 +176,11 @@ public class RetryQueryRunnerTest
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
Map<String, Object> responseContext
Map<String, Object> context
)
{
if ((int) responseContext.get("count") < 3) {
((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add(
if ((int) context.get("count") < 3) {
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor(
new Interval(
178888,
@ -226,7 +188,7 @@ public class RetryQueryRunnerTest
), "test", 1
)
);
responseContext.put("count", (int) responseContext.get("count") + 1);
context.put("count", (int) context.get("count") + 1);
return Sequences.empty();
} else {
return Sequences.simple(
@ -280,10 +242,10 @@ public class RetryQueryRunnerTest
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
Map<String, Object> responseContext
Map<String, Object> context
)
{
((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add(
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor(
new Interval(
178888,
@ -299,9 +261,12 @@ public class RetryQueryRunnerTest
),
new RetryQueryRunnerConfig()
{
public int getNumTries() { return 1; }
private int numTries = 1;
private boolean returnPartialResults = false;
public boolean returnPartialResults() { return false; }
public int getNumTries() { return numTries; }
public boolean returnPartialResults() { return returnPartialResults; }
},
jsonMapper
);
@ -316,34 +281,4 @@ public class RetryQueryRunnerTest
((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 1
);
}
@Test(expected = SegmentMissingException.class)
public void testIntervalMissingCausesException() throws Exception
{
Map<String, Object> context = new MapMaker().makeMap();
context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new ReportTimelineMissingIntervalQueryRunner<Result<TimeseriesResultValue>>(new Interval("2013/2014")),
(QueryToolChest) new TimeseriesQueryQueryToolChest(
new QueryConfig()
),
new RetryQueryRunnerConfig()
{
public int getNumTries() { return 1; }
public boolean returnPartialResults() { return false; }
},
jsonMapper
);
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query, context),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
Assert.assertTrue(
"Should have one entry in the list of missing segments",
((List) context.get(Result.MISSING_INTERVALS_KEY)).size() == 1
);
}
}

View File

@ -47,7 +47,6 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.ReferenceCountingSegmentQueryRunner;
import io.druid.query.ReportTimelineMissingIntervalQueryRunner;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
@ -265,57 +264,47 @@ public class ServerManager implements QuerySegmentWalker
FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
.create(intervals)
.transformCat(
new Function<Interval, Iterable<QueryRunner<T>>>()
new Function<Interval, Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>>>()
{
@Override
public Iterable<QueryRunner<T>> apply(final Interval interval)
public Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>> apply(Interval input)
{
Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>> holders = timeline.lookup(interval);
if (holders == null) {
return Arrays.<QueryRunner<T>>asList(new ReportTimelineMissingIntervalQueryRunner<T>(interval));
return timeline.lookup(input);
}
}
)
.transformCat(
new Function<TimelineObjectHolder<String, ReferenceCountingSegment>, Iterable<QueryRunner<T>>>()
{
@Override
public Iterable<QueryRunner<T>> apply(
@Nullable
final TimelineObjectHolder<String, ReferenceCountingSegment> holder
)
{
if (holder == null) {
return null;
}
return FunctionalIterable
.create(holders)
.transformCat(
new Function<TimelineObjectHolder<String, ReferenceCountingSegment>, Iterable<QueryRunner<T>>>()
.create(holder.getObject())
.transform(
new Function<PartitionChunk<ReferenceCountingSegment>, QueryRunner<T>>()
{
@Override
public Iterable<QueryRunner<T>> apply(
@Nullable final TimelineObjectHolder<String, ReferenceCountingSegment> holder
)
public QueryRunner<T> apply(PartitionChunk<ReferenceCountingSegment> input)
{
if (holder == null) {
return Arrays.<QueryRunner<T>>asList(
new ReportTimelineMissingIntervalQueryRunner<T>(
interval
)
);
}
return buildAndDecorateQueryRunner(
factory,
toolChest,
input.getObject(),
new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
input.getChunkNumber()
)
return FunctionalIterable
.create(holder.getObject())
.transform(
new Function<PartitionChunk<ReferenceCountingSegment>, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(PartitionChunk<ReferenceCountingSegment> input)
{
return buildAndDecorateQueryRunner(
factory,
toolChest,
input.getObject(),
new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
input.getChunkNumber()
)
);
}
}
);
);
}
}
);
@ -349,7 +338,9 @@ public class ServerManager implements QuerySegmentWalker
String dataSourceName = getDataSourceName(query.getDataSource());
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName);
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(
dataSourceName
);
if (timeline == null) {
return new NoopQueryRunner<T>();