Merge pull request #1563 from metamx/simplify-sequence-merging

simplify merging of results on broker
This commit is contained in:
Xavier Léauté 2015-07-28 09:56:28 -07:00
commit 4195deb41b
2 changed files with 133 additions and 34 deletions

View File

@ -0,0 +1,118 @@
package io.druid.benchmark;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
@State(Scope.Benchmark)
public class MergeSequenceBenchmark
{
// Number of Sequences to Merge
@Param({"1000"})
int count;
// Number of elements in each sequence
@Param({"1000", "10000"})
int sequenceLength;
// Number of sequences to merge at once
@Param({"10", "100"})
int mergeAtOnce;
private List<Sequence<Integer>> sequences;
@Setup
public void setup()
{
Random rand = new Random(0);
sequences = Lists.newArrayList();
for (int i = 0; i < count; i++) {
int[] sequence = new int[sequenceLength];
for (int j = 0; j < sequenceLength; j++) {
sequence[j] = rand.nextInt();
}
sequences.add(Sequences.simple(Ints.asList(sequence)));
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void mergeHierarchical(Blackhole blackhole)
{
Iterator<Sequence<Integer>> iterator = sequences.iterator();
List<Sequence<Integer>> partialMerged = new ArrayList<Sequence<Integer>>();
List<Sequence<Integer>> toMerge = new ArrayList<Sequence<Integer>>();
while (iterator.hasNext()) {
toMerge.add(iterator.next());
if (toMerge.size() == mergeAtOnce) {
partialMerged.add(new MergeSequence<Integer>(Ordering.<Integer>natural(), Sequences.simple(toMerge)));
toMerge = new ArrayList<Sequence<Integer>>();
}
}
if (!toMerge.isEmpty()) {
partialMerged.add(new MergeSequence<Integer>(Ordering.<Integer>natural(), Sequences.simple(toMerge)));
}
MergeSequence<Integer> mergeSequence = new MergeSequence(
Ordering.<Integer>natural(),
Sequences.simple(partialMerged)
);
Integer accumulate = mergeSequence.accumulate(
0, new Accumulator<Integer, Integer>()
{
@Override
public Integer accumulate(Integer accumulated, Integer in)
{
return accumulated + in;
}
}
);
blackhole.consume(accumulate);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void mergeFlat(final Blackhole blackhole)
{
MergeSequence<Integer> mergeSequence = new MergeSequence(Ordering.<Integer>natural(), Sequences.simple(sequences));
Integer accumulate = mergeSequence.accumulate(
0, new Accumulator<Integer, Integer>()
{
@Override
public Integer accumulate(Integer accumulated, Integer in)
{
return accumulated + in;
}
}
);
blackhole.consume(accumulate);
}
}

View File

@ -27,7 +27,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@ -37,7 +36,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.LazySequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
@ -64,11 +62,8 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -77,6 +72,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import org.joda.time.Interval;
/**
*/
@ -491,36 +487,21 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
return Sequences.empty();
}
Collections.sort(
sequencesByInterval,
Ordering.from(Comparators.intervalsByStartThenEnd()).onResultOf(Pair.<Interval, Sequence<T>>lhsFn())
return toolChest.mergeSequencesUnordered(
Sequences.simple(
Lists.transform(
sequencesByInterval,
new Function<Pair<Interval, Sequence<T>>, Sequence<T>>()
{
@Override
public Sequence<T> apply(Pair<Interval, Sequence<T>> input)
{
return input.rhs;
}
}
)
)
);
// result sequences from overlapping intervals could start anywhere within that interval
// therefore we cannot assume any ordering with respect to the first result from each
// and must resort to calling toolchest.mergeSequencesUnordered for those.
Iterator<Pair<Interval, Sequence<T>>> iterator = sequencesByInterval.iterator();
Pair<Interval, Sequence<T>> current = iterator.next();
final List<Sequence<T>> orderedSequences = Lists.newLinkedList();
List<Sequence<T>> unordered = Lists.newLinkedList();
unordered.add(current.rhs);
while (iterator.hasNext()) {
Pair<Interval, Sequence<T>> next = iterator.next();
if (!next.lhs.overlaps(current.lhs)) {
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
unordered = Lists.newLinkedList();
}
unordered.add(next.rhs);
current = next;
}
if (!unordered.isEmpty()) {
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
}
return toolChest.mergeSequencesUnordered(Sequences.simple(orderedSequences));
}
private static class CachePopulator