From ee019872f71f7cd5defada22b16b9e4e66af59cb Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 4 Nov 2014 18:43:55 -0800 Subject: [PATCH] TopN performance improvements Re-factor scanAndAggregate in PooledTopN * Loops are now a little bit tighter when looping over aggregates. This will hopefully assist in loop execution optimization. * Pre-calculated the aggregate offsets instead of shifting them during runtime. * Cursor loop could use some TLC, but would require a massive refactoring on how TopN queries are executed. * Any potential modifications to query workflow need to account for Stream vs Batch data, and that not all data will be array backed that comes in. Change data storage type in TopNNumericResultBuilder. * Use PriorityQueue to store * Checks to see if should even bother adding to Queue before adding. * Re-orders Queue on build() call. * Ideally the order would be directly preserved on build(), but this is a close second. Updates to CompressedObjectStrategy to support more compression types * Compression types are not yet dynamically configurable. * Added a benchmarking system for topN to test the compression * Updated pom.xml to include junit benchmarking * added an Uncompressed option --- processing/pom.xml | 7 + .../druid/query/topn/PooledTopNAlgorithm.java | 37 ++-- .../query/topn/TopNNumericResultBuilder.java | 153 +++++++++++------ .../data/CompressedObjectStrategy.java | 68 ++++++-- .../query/topn/TopNQueryRunnerBenchmark.java | 159 ++++++++++++++++++ 5 files changed, 343 insertions(+), 81 deletions(-) create mode 100644 processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java diff --git a/processing/pom.xml b/processing/pom.xml index 2f60f53d2bc..3fc7fd6242e 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -99,6 +99,12 @@ junit test + + com.carrotsearch + junit-benchmarks + 0.7.2 + test + org.easymock easymock @@ -107,6 +113,7 @@ com.google.caliper caliper + test diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java index caf892f292d..6f0c527fd1b 100644 --- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java @@ -158,29 +158,34 @@ public class PooledTopNAlgorithm final Cursor cursor = params.getCursor(); final DimensionSelector dimSelector = params.getDimSelector(); + final int[] aggregatorOffsets = new int[aggregatorSizes.length]; + for (int j = 0, offset = 0; j < aggregatorSizes.length; ++j) { + aggregatorOffsets[j] = offset; + offset += aggregatorSizes[j]; + } + while (!cursor.isDone()) { final IndexedInts dimValues = dimSelector.getRow(); for (int i = 0; i < dimValues.size(); ++i) { final int dimIndex = dimValues.get(i); int position = positions[dimIndex]; - switch (position) { - case SKIP_POSITION_VALUE: - break; - case INIT_POSITION_VALUE: - positions[dimIndex] = (dimIndex - numProcessed) * numBytesPerRecord; - position = positions[dimIndex]; - for (int j = 0; j < theAggregators.length; ++j) { - theAggregators[j].init(resultsBuf, position); - position += aggregatorSizes[j]; - } - position = positions[dimIndex]; - default: - for (int j = 0; j < theAggregators.length; ++j) { - theAggregators[j].aggregate(resultsBuf, position); - position += aggregatorSizes[j]; - } + if (SKIP_POSITION_VALUE == position) { + continue; } + if (INIT_POSITION_VALUE == position) { + positions[dimIndex] = (dimIndex - numProcessed) * numBytesPerRecord; + position = positions[dimIndex]; + for (int j = 0; j < theAggregators.length; ++j) { + theAggregators[j].init(resultsBuf, position + aggregatorOffsets[j]); + } + position = positions[dimIndex]; + } + for (int j = 0; j < theAggregators.length; ++j) { + theAggregators[j].aggregate(resultsBuf, position + aggregatorOffsets[j]); + } + + } cursor.advance(); diff --git a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java index 4a40f4bb2d5..62ddf139eae 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java @@ -19,8 +19,9 @@ package io.druid.query.topn; -import com.google.common.collect.Maps; -import com.google.common.collect.MinMaxPriorityQueue; +import com.apple.concurrent.Dispatch; +import com.google.common.base.Function; +import com.google.common.collect.*; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorUtil; @@ -28,22 +29,25 @@ import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; import org.joda.time.DateTime; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import javax.annotation.Nullable; +import java.util.*; /** + * */ public class TopNNumericResultBuilder implements TopNResultBuilder { + private final DateTime timestamp; private final DimensionSpec dimSpec; private final String metricName; private final List aggFactories; private final List postAggs; - private MinMaxPriorityQueue pQueue = null; + private final PriorityQueue pQueue; + private final Comparator dimValComparator; + private final Comparator dimNameComparator; + private final int threshold; + private final Comparator metricComparator; public TopNNumericResultBuilder( DateTime timestamp, @@ -60,18 +64,50 @@ public class TopNNumericResultBuilder implements TopNResultBuilder this.metricName = metricName; this.aggFactories = aggFactories; this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName); + this.threshold = threshold; + this.metricComparator = comparator; + this.dimNameComparator = new Comparator() + { + @Override + public int compare(String o1, String o2) + { + int retval; + if (o1 == null) { + retval = -1; + } else if (o2 == null) { + retval = 1; + } else { + retval = o1.compareTo(o2); + } + return retval; + } + }; + this.dimValComparator = new Comparator() + { + @Override + public int compare(DimValHolder d1, DimValHolder d2) + { + int retVal = metricComparator.compare(d1.getTopNMetricVal(), d2.getTopNMetricVal()); - instantiatePQueue(threshold, comparator); + if (retVal == 0) { + retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName()); + } + + return retVal; + } + }; + + pQueue = new PriorityQueue<>(this.threshold + 1, this.dimValComparator); } @Override - public TopNResultBuilder addEntry( + public TopNNumericResultBuilder addEntry( String dimName, Object dimValIndex, Object[] metricVals ) { - Map metricValues = Maps.newLinkedHashMap(); + final Map metricValues = Maps.newLinkedHashMap(); metricValues.put(dimSpec.getOutputName(), dimName); @@ -85,27 +121,47 @@ public class TopNNumericResultBuilder implements TopNResultBuilder } Object topNMetricVal = metricValues.get(metricName); - pQueue.add( - new DimValHolder.Builder().withTopNMetricVal(topNMetricVal) - .withDirName(dimName) - .withDimValIndex(dimValIndex) - .withMetricValues(metricValues) - .build() - ); + + if (shouldAdd(topNMetricVal)) { + DimValHolder dimValHolder = new DimValHolder.Builder() + .withTopNMetricVal(topNMetricVal) + .withDirName(dimName) + .withDimValIndex(dimValIndex) + .withMetricValues(metricValues) + .build(); + pQueue.add(dimValHolder); + } + if (this.pQueue.size() > this.threshold) { + pQueue.poll(); + } return this; } + private boolean shouldAdd(Object topNMetricVal) + { + final boolean belowThreshold = pQueue.size() < this.threshold; + final boolean belowMax = belowThreshold + || this.metricComparator.compare(pQueue.peek().getTopNMetricVal(), topNMetricVal) < 0; + return belowMax; + } + @Override public TopNResultBuilder addEntry(DimensionAndMetricValueExtractor dimensionAndMetricValueExtractor) { - pQueue.add( - new DimValHolder.Builder().withTopNMetricVal(dimensionAndMetricValueExtractor.getDimensionValue(metricName)) - .withDirName(dimSpec.getOutputName()) - .withMetricValues(dimensionAndMetricValueExtractor.getBaseObject()) - .build() - ); + final Object dimValue = dimensionAndMetricValueExtractor.getDimensionValue(metricName); + if (shouldAdd(dimValue)) { + final DimValHolder valHolder = new DimValHolder.Builder() + .withTopNMetricVal(dimValue) + .withDirName(dimensionAndMetricValueExtractor.getStringDimensionValue(dimSpec.getOutputName())) + .withMetricValues(dimensionAndMetricValueExtractor.getBaseObject()) + .build(); + pQueue.add(valHolder); + } + if (pQueue.size() > this.threshold) { + pQueue.poll(); // throw away + } return this; } @@ -118,41 +174,40 @@ public class TopNNumericResultBuilder implements TopNResultBuilder @Override public Result build() { - // Pull out top aggregated values - List> values = new ArrayList>(pQueue.size()); - while (!pQueue.isEmpty()) { - values.add(pQueue.remove().getMetricValues()); - } - - return new Result( - timestamp, - new TopNResultValue(values) - ); - } - - private void instantiatePQueue(int threshold, final Comparator comparator) - { - this.pQueue = MinMaxPriorityQueue.orderedBy( - new Comparator() + final DimValHolder[] holderValueArray = pQueue.toArray(new DimValHolder[0]); + Arrays.sort( + holderValueArray, new Comparator() { @Override public int compare(DimValHolder d1, DimValHolder d2) { - int retVal = comparator.compare(d2.getTopNMetricVal(), d1.getTopNMetricVal()); + int retVal = -metricComparator.compare(d1.getTopNMetricVal(), d2.getTopNMetricVal()); if (retVal == 0) { - if (d1.getDimName() == null) { - retVal = -1; - } else if (d2.getDimName() == null) { - retVal = 1; - } else { - retVal = d1.getDimName().compareTo(d2.getDimName()); - } + retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName()); } return retVal; } } - ).maximumSize(threshold).create(); + ); + List holderValues = Arrays.asList(holderValueArray); + + // Pull out top aggregated values + final List> values = Lists.transform( + holderValues, + new Function>() + { + @Override + public Map apply(DimValHolder valHolder) + { + return valHolder.getMetricValues(); + } + } + ); + return new Result( + timestamp, + new TopNResultValue(values) + ); } } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java index 0796b60f66b..0ea2d86cda9 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java @@ -49,13 +49,13 @@ public class CompressedObjectStrategy implements ObjectStrateg @Override public Decompressor getDecompressor() { - return new LZFDecompressor(); + return LZFDecompressor.defaultDecompressor; } @Override public Compressor getCompressor() { - return new LZFCompressor(); + return LZFCompressor.defaultCompressor; } }, @@ -63,15 +63,26 @@ public class CompressedObjectStrategy implements ObjectStrateg @Override public Decompressor getDecompressor() { - return new LZ4Decompressor(); + return LZ4Decompressor.defaultDecompressor; } @Override public Compressor getCompressor() { - return new LZ4Compressor(); + return LZ4Compressor.defaultCompressor; } - }; + }, + UNCOMPRESSED((byte)0x2){ + @Override + public Decompressor getDecompressor(){ + return UncompressedDecompressor.defaultDecompressor; + } + @Override + public Compressor getCompressor(){ + return UncompressedCompressor.defaultCompressor; + } + } + ; final byte id; @@ -120,9 +131,35 @@ public class CompressedObjectStrategy implements ObjectStrateg */ public byte[] compress(byte[] bytes); } + public static class UncompressedCompressor implements Compressor{ + private static final UncompressedCompressor defaultCompressor = new UncompressedCompressor(); + @Override + public byte[] compress(byte[] bytes) { + return bytes; + } + } + public static class UncompressedDecompressor implements Decompressor{ + private static final UncompressedDecompressor defaultDecompressor = new UncompressedDecompressor(); + @Override + public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) { + final int maxCopy = Math.min(numBytes, out.remaining()); + final ByteBuffer copyBuffer = in.duplicate(); + copyBuffer.limit(copyBuffer.position() + maxCopy); + out.put(copyBuffer); + + // Setup the buffers properly + out.flip(); + in.position(in.position() + maxCopy); + } + @Override + public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize) { + decompress(in, numBytes, out); + } + } public static class LZFDecompressor implements Decompressor { + private static final LZFDecompressor defaultDecompressor = new LZFDecompressor(); @Override public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) { @@ -149,6 +186,7 @@ public class CompressedObjectStrategy implements ObjectStrateg public static class LZFCompressor implements Compressor { + private static final LZFCompressor defaultCompressor = new LZFCompressor(); @Override public byte[] compress(byte[] bytes) { @@ -162,9 +200,9 @@ public class CompressedObjectStrategy implements ObjectStrateg public static class LZ4Decompressor implements Decompressor { - private final LZ4SafeDecompressor lz4 = LZ4Factory.fastestJavaInstance().safeDecompressor(); - private final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor(); - + private static final LZ4SafeDecompressor lz4Safe = LZ4Factory.fastestJavaInstance().safeDecompressor(); + private static final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor(); + private static final LZ4Decompressor defaultDecompressor = new LZ4Decompressor(); @Override public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) { @@ -173,8 +211,7 @@ public class CompressedObjectStrategy implements ObjectStrateg try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) { final byte[] outputBytes = outputBytesHolder.get(); - final int numDecompressedBytes = lz4.decompress(bytes, 0, bytes.length, outputBytes, 0, outputBytes.length); - + final int numDecompressedBytes = lz4Fast.decompress(bytes, 0, outputBytes, 0, outputBytes.length); out.put(outputBytes, 0, numDecompressedBytes); out.flip(); } @@ -189,6 +226,7 @@ public class CompressedObjectStrategy implements ObjectStrateg final byte[] bytes = new byte[numBytes]; in.get(bytes); + // TODO: Upgrade this to ByteBuffer once https://github.com/jpountz/lz4-java/issues/9 is in mainline code for lz4-java try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) { final byte[] outputBytes = outputBytesHolder.get(); lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize); @@ -204,16 +242,14 @@ public class CompressedObjectStrategy implements ObjectStrateg public static class LZ4Compressor implements Compressor { - private final net.jpountz.lz4.LZ4Compressor lz4 = LZ4Factory.fastestJavaInstance().highCompressor(); + private static final LZ4Compressor defaultCompressor = new LZ4Compressor(); + private static final net.jpountz.lz4.LZ4Compressor lz4Fast = LZ4Factory.fastestJavaInstance().fastCompressor(); + private static final net.jpountz.lz4.LZ4Compressor lz4High = LZ4Factory.fastestJavaInstance().highCompressor(); @Override public byte[] compress(byte[] bytes) { - final byte[] intermediate = new byte[lz4.maxCompressedLength(bytes.length)]; - final int outputBytes = lz4.compress(bytes, 0, bytes.length, intermediate, 0, intermediate.length); - final byte[] out = new byte[outputBytes]; - System.arraycopy(intermediate, 0, out, 0, outputBytes); - return out; + return lz4High.compress(bytes); } } diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java new file mode 100644 index 00000000000..2abdf95636c --- /dev/null +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java @@ -0,0 +1,159 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.topn; + +import com.carrotsearch.junitbenchmarks.AbstractBenchmark; +import com.carrotsearch.junitbenchmarks.BenchmarkOptions; +import com.google.common.base.Supplier; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.druid.collections.StupidPool; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.MaxAggregatorFactory; +import io.druid.query.aggregation.MinAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.TestIndex; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Based on TopNQueryRunnerTest + */ +//@Ignore // Don't need to actually run the benchmark every time +public class TopNQueryRunnerBenchmark extends AbstractBenchmark +{ + + public static enum TestCases + { + rtIndex, mMappedTestIndex, mergedRealtimeIndex, rtIndexOffheap + } + + private static final String marketDimension = "market"; + private static final String segmentId = "testSegment"; + + private static final HashMap context = new HashMap(); + + private static final TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(marketDimension) + .metric(QueryRunnerTestHelper.indexMetric) + .threshold(4) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Lists.newArrayList( + Iterables.concat( + QueryRunnerTestHelper.commonAggregators, + Lists.newArrayList( + new MaxAggregatorFactory("maxIndex", "index"), + new MinAggregatorFactory("minIndex", "index") + ) + ) + ) + ) + .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .build(); + private static final Map testCaseMap = Maps.newHashMap(); + + @BeforeClass + public static void setUp() throws Exception + { + QueryRunnerFactory factory = new TopNQueryRunnerFactory( + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(2000); + } + } + ), + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + testCaseMap.put( + TestCases.rtIndex, + QueryRunnerTestHelper.makeQueryRunner( + factory, + new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId) + ) + ); + testCaseMap.put( + TestCases.mMappedTestIndex, + QueryRunnerTestHelper.makeQueryRunner( + factory, + new QueryableIndexSegment(segmentId, TestIndex.getMMappedTestIndex()) + ) + ); + testCaseMap.put( + TestCases.mergedRealtimeIndex, + QueryRunnerTestHelper.makeQueryRunner( + factory, + new QueryableIndexSegment(segmentId, TestIndex.mergedRealtimeIndex()) + ) + ); + testCaseMap.put( + TestCases.rtIndexOffheap, + QueryRunnerTestHelper.makeQueryRunner( + factory, + new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(true), segmentId) + ) + ); + //Thread.sleep(10000); + } + + @BenchmarkOptions(warmupRounds = 10000, benchmarkRounds = 10000) + @Test + public void testmMapped() + { + testCaseMap.get(TestCases.mMappedTestIndex).run(query, context); + } + +/** + * These are not important + @BenchmarkOptions(warmupRounds = 10000,benchmarkRounds = 10000) + @Test public void testrtIndex(){ + testCaseMap.get(TestCases.rtIndex).run(query,context); + } + + @BenchmarkOptions(warmupRounds = 10000, benchmarkRounds = 10000) + @Test public void testMerged(){ + testCaseMap.get(TestCases.mergedRealtimeIndex).run(query,context); + } + + @BenchmarkOptions(warmupRounds = 10000, benchmarkRounds = 10000) + @Test public void testOffHeap(){ + testCaseMap.get(TestCases.rtIndexOffheap).run(query,context); + } + */ +}