From 7736c3fc276a674313c8e332ce0472660d90cad3 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 14 Nov 2014 13:28:32 -0800 Subject: [PATCH] address cr --- .../java/io/druid/query/RetryQueryRunner.java | 47 ++++++++++--------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunner.java b/processing/src/main/java/io/druid/query/RetryQueryRunner.java index cd48df0671d..e086711abd0 100644 --- a/processing/src/main/java/io/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/RetryQueryRunner.java @@ -61,8 +61,7 @@ public class RetryQueryRunner implements QueryRunner public Sequence run(final Query query, final Map context) { final List> listOfSequences = Lists.newArrayList(); - final Sequence returningSeq = baseRunner.run(query, context); - listOfSequences.add(returningSeq); + listOfSequences.add(baseRunner.run(query, context)); return new YieldingSequenceBase() { @@ -73,32 +72,38 @@ public class RetryQueryRunner implements QueryRunner { final List missingSegments = getMissingSegments(context); - if (missingSegments.isEmpty()) { - return returningSeq.toYielder(initValue, accumulator); - } + if (!missingSegments.isEmpty()) { + for (int i = 0; i < config.numTries(); i++) { + log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); - for (int i = 0; i < config.numTries(); i++) { - log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); + context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList()); + final Query retryQuery = query.withQuerySegmentSpec( + new MultipleSpecificSegmentSpec( + missingSegments + ) + ); + Sequence retrySequence = baseRunner.run(retryQuery, context); + listOfSequences.add(retrySequence); + if (getMissingSegments(context).isEmpty()) { + break; + } + } - context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList()); - final Query retryQuery = query.withQuerySegmentSpec( - new MultipleSpecificSegmentSpec( - missingSegments - ) - ); - Sequence retrySequence = baseRunner.run(retryQuery, context); - listOfSequences.add(retrySequence); - if (getMissingSegments(context).isEmpty()) { - break; + final List finalMissingSegs = getMissingSegments(context); + if (!config.returnPartialResults() && !finalMissingSegs.isEmpty()) { + throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs); } } - final List finalMissingSegs = getMissingSegments(context); - if (!config.returnPartialResults() && !finalMissingSegs.isEmpty()) { - throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs); + + final Sequence retSeq; + if (listOfSequences.size() == 1) { + retSeq = listOfSequences.get(0); + } else { + retSeq = toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)); } - return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator); + return retSeq.toYielder(initValue, accumulator); } }; }