diff --git a/benchmarks/src/main/java/io/druid/benchmark/MergeSequenceBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/MergeSequenceBenchmark.java new file mode 100644 index 00000000000..71b186dd632 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/MergeSequenceBenchmark.java @@ -0,0 +1,118 @@ +package io.druid.benchmark; + +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.google.common.primitives.Ints; +import com.metamx.common.guava.Accumulator; +import com.metamx.common.guava.MergeSequence; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +@State(Scope.Benchmark) +public class MergeSequenceBenchmark +{ + + // Number of Sequences to Merge + @Param({"1000"}) + int count; + + // Number of elements in each sequence + @Param({"1000", "10000"}) + int sequenceLength; + + // Number of sequences to merge at once + @Param({"10", "100"}) + int mergeAtOnce; + + private List> sequences; + + @Setup + public void setup() + { + Random rand = new Random(0); + sequences = Lists.newArrayList(); + for (int i = 0; i < count; i++) { + int[] sequence = new int[sequenceLength]; + for (int j = 0; j < sequenceLength; j++) { + sequence[j] = rand.nextInt(); + } + sequences.add(Sequences.simple(Ints.asList(sequence))); + } + + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void mergeHierarchical(Blackhole blackhole) + { + Iterator> iterator = sequences.iterator(); + List> partialMerged = new ArrayList>(); + List> toMerge = new ArrayList>(); + + while (iterator.hasNext()) { + toMerge.add(iterator.next()); + if (toMerge.size() == mergeAtOnce) { + partialMerged.add(new MergeSequence(Ordering.natural(), Sequences.simple(toMerge))); + toMerge = new ArrayList>(); + } + } + + if (!toMerge.isEmpty()) { + partialMerged.add(new MergeSequence(Ordering.natural(), Sequences.simple(toMerge))); + } + MergeSequence mergeSequence = new MergeSequence( + Ordering.natural(), + Sequences.simple(partialMerged) + ); + Integer accumulate = mergeSequence.accumulate( + 0, new Accumulator() + { + @Override + public Integer accumulate(Integer accumulated, Integer in) + { + return accumulated + in; + } + } + ); + blackhole.consume(accumulate); + + + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void mergeFlat(final Blackhole blackhole) + { + MergeSequence mergeSequence = new MergeSequence(Ordering.natural(), Sequences.simple(sequences)); + Integer accumulate = mergeSequence.accumulate( + 0, new Accumulator() + { + @Override + public Integer accumulate(Integer accumulated, Integer in) + { + return accumulated + in; + } + } + ); + blackhole.consume(accumulate); + + } + + +} diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 9ed71c27cec..87d1a7eaeca 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -27,7 +27,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -37,7 +36,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.Pair; import com.metamx.common.guava.BaseSequence; -import com.metamx.common.guava.Comparators; import com.metamx.common.guava.LazySequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -64,11 +62,8 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineLookup; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.partition.PartitionChunk; -import org.joda.time.Interval; - import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -77,6 +72,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import org.joda.time.Interval; /** */ @@ -491,36 +487,21 @@ public class CachingClusteredClient implements QueryRunner return Sequences.empty(); } - Collections.sort( - sequencesByInterval, - Ordering.from(Comparators.intervalsByStartThenEnd()).onResultOf(Pair.>lhsFn()) + return toolChest.mergeSequencesUnordered( + Sequences.simple( + Lists.transform( + sequencesByInterval, + new Function>, Sequence>() + { + @Override + public Sequence apply(Pair> input) + { + return input.rhs; + } + } + ) + ) ); - - // result sequences from overlapping intervals could start anywhere within that interval - // therefore we cannot assume any ordering with respect to the first result from each - // and must resort to calling toolchest.mergeSequencesUnordered for those. - Iterator>> iterator = sequencesByInterval.iterator(); - Pair> current = iterator.next(); - - final List> orderedSequences = Lists.newLinkedList(); - List> unordered = Lists.newLinkedList(); - - unordered.add(current.rhs); - - while (iterator.hasNext()) { - Pair> next = iterator.next(); - if (!next.lhs.overlaps(current.lhs)) { - orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered))); - unordered = Lists.newLinkedList(); - } - unordered.add(next.rhs); - current = next; - } - if (!unordered.isEmpty()) { - orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered))); - } - - return toolChest.mergeSequencesUnordered(Sequences.simple(orderedSequences)); } private static class CachePopulator