mirror of https://github.com/apache/druid.git
simplify merging of results on broker.
add benchmark formatting and review comments organize imports
This commit is contained in:
parent
5f1f4424eb
commit
efb6e0649e
|
@ -0,0 +1,118 @@
|
||||||
|
package io.druid.benchmark;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Ordering;
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
|
import com.metamx.common.guava.Accumulator;
|
||||||
|
import com.metamx.common.guava.MergeSequence;
|
||||||
|
import com.metamx.common.guava.Sequence;
|
||||||
|
import com.metamx.common.guava.Sequences;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.openjdk.jmh.annotations.Benchmark;
|
||||||
|
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||||
|
import org.openjdk.jmh.annotations.Mode;
|
||||||
|
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||||
|
import org.openjdk.jmh.annotations.Param;
|
||||||
|
import org.openjdk.jmh.annotations.Scope;
|
||||||
|
import org.openjdk.jmh.annotations.Setup;
|
||||||
|
import org.openjdk.jmh.annotations.State;
|
||||||
|
import org.openjdk.jmh.infra.Blackhole;
|
||||||
|
|
||||||
|
@State(Scope.Benchmark)
|
||||||
|
public class MergeSequenceBenchmark
|
||||||
|
{
|
||||||
|
|
||||||
|
// Number of Sequences to Merge
|
||||||
|
@Param({"1000"})
|
||||||
|
int count;
|
||||||
|
|
||||||
|
// Number of elements in each sequence
|
||||||
|
@Param({"1000", "10000"})
|
||||||
|
int sequenceLength;
|
||||||
|
|
||||||
|
// Number of sequences to merge at once
|
||||||
|
@Param({"10", "100"})
|
||||||
|
int mergeAtOnce;
|
||||||
|
|
||||||
|
private List<Sequence<Integer>> sequences;
|
||||||
|
|
||||||
|
@Setup
|
||||||
|
public void setup()
|
||||||
|
{
|
||||||
|
Random rand = new Random(0);
|
||||||
|
sequences = Lists.newArrayList();
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
int[] sequence = new int[sequenceLength];
|
||||||
|
for (int j = 0; j < sequenceLength; j++) {
|
||||||
|
sequence[j] = rand.nextInt();
|
||||||
|
}
|
||||||
|
sequences.add(Sequences.simple(Ints.asList(sequence)));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
@BenchmarkMode(Mode.AverageTime)
|
||||||
|
@OutputTimeUnit(TimeUnit.MILLISECONDS)
|
||||||
|
public void mergeHierarchical(Blackhole blackhole)
|
||||||
|
{
|
||||||
|
Iterator<Sequence<Integer>> iterator = sequences.iterator();
|
||||||
|
List<Sequence<Integer>> partialMerged = new ArrayList<Sequence<Integer>>();
|
||||||
|
List<Sequence<Integer>> toMerge = new ArrayList<Sequence<Integer>>();
|
||||||
|
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
toMerge.add(iterator.next());
|
||||||
|
if (toMerge.size() == mergeAtOnce) {
|
||||||
|
partialMerged.add(new MergeSequence<Integer>(Ordering.<Integer>natural(), Sequences.simple(toMerge)));
|
||||||
|
toMerge = new ArrayList<Sequence<Integer>>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!toMerge.isEmpty()) {
|
||||||
|
partialMerged.add(new MergeSequence<Integer>(Ordering.<Integer>natural(), Sequences.simple(toMerge)));
|
||||||
|
}
|
||||||
|
MergeSequence<Integer> mergeSequence = new MergeSequence(
|
||||||
|
Ordering.<Integer>natural(),
|
||||||
|
Sequences.simple(partialMerged)
|
||||||
|
);
|
||||||
|
Integer accumulate = mergeSequence.accumulate(
|
||||||
|
0, new Accumulator<Integer, Integer>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Integer accumulate(Integer accumulated, Integer in)
|
||||||
|
{
|
||||||
|
return accumulated + in;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
blackhole.consume(accumulate);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
@BenchmarkMode(Mode.AverageTime)
|
||||||
|
@OutputTimeUnit(TimeUnit.MILLISECONDS)
|
||||||
|
public void mergeFlat(final Blackhole blackhole)
|
||||||
|
{
|
||||||
|
MergeSequence<Integer> mergeSequence = new MergeSequence(Ordering.<Integer>natural(), Sequences.simple(sequences));
|
||||||
|
Integer accumulate = mergeSequence.accumulate(
|
||||||
|
0, new Accumulator<Integer, Integer>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Integer accumulate(Integer accumulated, Integer in)
|
||||||
|
{
|
||||||
|
return accumulated + in;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
blackhole.consume(accumulate);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -27,7 +27,6 @@ import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Iterators;
|
import com.google.common.collect.Iterators;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Ordering;
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
@ -37,7 +36,6 @@ import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.metamx.common.Pair;
|
import com.metamx.common.Pair;
|
||||||
import com.metamx.common.guava.BaseSequence;
|
import com.metamx.common.guava.BaseSequence;
|
||||||
import com.metamx.common.guava.Comparators;
|
|
||||||
import com.metamx.common.guava.LazySequence;
|
import com.metamx.common.guava.LazySequence;
|
||||||
import com.metamx.common.guava.Sequence;
|
import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
|
@ -64,11 +62,8 @@ import io.druid.timeline.DataSegment;
|
||||||
import io.druid.timeline.TimelineLookup;
|
import io.druid.timeline.TimelineLookup;
|
||||||
import io.druid.timeline.TimelineObjectHolder;
|
import io.druid.timeline.TimelineObjectHolder;
|
||||||
import io.druid.timeline.partition.PartitionChunk;
|
import io.druid.timeline.partition.PartitionChunk;
|
||||||
import org.joda.time.Interval;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -77,6 +72,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -491,36 +487,21 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
return Sequences.empty();
|
return Sequences.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
Collections.sort(
|
return toolChest.mergeSequencesUnordered(
|
||||||
sequencesByInterval,
|
Sequences.simple(
|
||||||
Ordering.from(Comparators.intervalsByStartThenEnd()).onResultOf(Pair.<Interval, Sequence<T>>lhsFn())
|
Lists.transform(
|
||||||
|
sequencesByInterval,
|
||||||
|
new Function<Pair<Interval, Sequence<T>>, Sequence<T>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Sequence<T> apply(Pair<Interval, Sequence<T>> input)
|
||||||
|
{
|
||||||
|
return input.rhs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
// result sequences from overlapping intervals could start anywhere within that interval
|
|
||||||
// therefore we cannot assume any ordering with respect to the first result from each
|
|
||||||
// and must resort to calling toolchest.mergeSequencesUnordered for those.
|
|
||||||
Iterator<Pair<Interval, Sequence<T>>> iterator = sequencesByInterval.iterator();
|
|
||||||
Pair<Interval, Sequence<T>> current = iterator.next();
|
|
||||||
|
|
||||||
final List<Sequence<T>> orderedSequences = Lists.newLinkedList();
|
|
||||||
List<Sequence<T>> unordered = Lists.newLinkedList();
|
|
||||||
|
|
||||||
unordered.add(current.rhs);
|
|
||||||
|
|
||||||
while (iterator.hasNext()) {
|
|
||||||
Pair<Interval, Sequence<T>> next = iterator.next();
|
|
||||||
if (!next.lhs.overlaps(current.lhs)) {
|
|
||||||
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
|
|
||||||
unordered = Lists.newLinkedList();
|
|
||||||
}
|
|
||||||
unordered.add(next.rhs);
|
|
||||||
current = next;
|
|
||||||
}
|
|
||||||
if (!unordered.isEmpty()) {
|
|
||||||
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
|
|
||||||
}
|
|
||||||
|
|
||||||
return toolChest.mergeSequencesUnordered(Sequences.simple(orderedSequences));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class CachePopulator
|
private static class CachePopulator
|
||||||
|
|
Loading…
Reference in New Issue