Merge pull request #1602 from metamx/more-code-cleanup

Some perf Improvements in Broker
This commit is contained in:
Charles Allen 2015-08-11 13:51:49 -07:00
commit be89105621
3 changed files with 24 additions and 47 deletions

View File

@ -19,6 +19,7 @@ package io.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
@ -90,10 +91,12 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
if (!config.isReturnPartialResults() && !finalMissingSegs.isEmpty()) {
throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs);
}
}
return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator);
}
else {
return Iterables.getOnlyElement(listOfSequences).toYielder(initValue, accumulator);
}
}
};
}

View File

@ -257,14 +257,14 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
@Override
public Sequence<T> get()
{
ArrayList<Pair<Interval, Sequence<T>>> sequencesByInterval = Lists.newArrayList();
ArrayList<Sequence<T>> sequencesByInterval = Lists.newArrayList();
addSequencesFromCache(sequencesByInterval);
addSequencesFromServer(sequencesByInterval);
return mergeCachedAndUncachedSequences(sequencesByInterval, toolChest);
}
private void addSequencesFromCache(ArrayList<Pair<Interval, Sequence<T>>> listOfSequences)
private void addSequencesFromCache(ArrayList<Sequence<T>> listOfSequences)
{
if (strategy == null) {
return;
@ -301,11 +301,11 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
}
);
listOfSequences.add(Pair.of(cachedResultPair.lhs, Sequences.map(cachedSequence, pullFromCacheFunction)));
listOfSequences.add(Sequences.map(cachedSequence, pullFromCacheFunction));
}
}
private void addSequencesFromServer(ArrayList<Pair<Interval, Sequence<T>>> listOfSequences)
private void addSequencesFromServer(ArrayList<Sequence<T>> listOfSequences)
{
listOfSequences.ensureCapacity(listOfSequences.size() + serverSegments.size());
@ -326,7 +326,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
final List<Interval> intervals = segmentSpec.getIntervals();
final Sequence<T> resultSeqToAdd;
if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable
@ -466,12 +465,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
);
}
listOfSequences.add(
Pair.of(
new Interval(intervals.get(0).getStart(), intervals.get(intervals.size() - 1).getEnd()),
resultSeqToAdd
)
);
listOfSequences.add(resultSeqToAdd);
}
}
}// End of Supplier
@ -479,7 +473,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
protected Sequence<T> mergeCachedAndUncachedSequences(
List<Pair<Interval, Sequence<T>>> sequencesByInterval,
List<Sequence<T>> sequencesByInterval,
QueryToolChest<T, Query<T>> toolChest
)
{
@ -489,17 +483,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
return toolChest.mergeSequencesUnordered(
Sequences.simple(
Lists.transform(
sequencesByInterval,
new Function<Pair<Interval, Sequence<T>>, Sequence<T>>()
{
@Override
public Sequence<T> apply(Pair<Interval, Sequence<T>> input)
{
return input.rhs;
}
}
)
sequencesByInterval
)
);
}

View File

@ -762,24 +762,15 @@ public class CachingClusteredClientTest
@Test
public void testOutOfOrderSequenceMerging() throws Exception
{
List<Pair<Interval, Sequence<Result<TopNResultValue>>>> sequences =
List<Sequence<Result<TopNResultValue>>> sequences =
Lists.newArrayList(
Pair.of(
// this could ne the result of a historical node returning the merged result of
// a) an empty result for segment 2011-01-02/2011-01-05
// and b) result for a second partition for 2011-01-05/2011-01-10
new Interval("2011-01-02/2011-01-10"),
Sequences.simple(
makeTopNResults(
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
)
)
),
Pair.of(
new Interval("2011-01-05/2011-01-10"),
Sequences.simple(
makeTopNResults(
new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
@ -788,7 +779,6 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
)
)
)
);
TestHelper.assertExpectedResults(