TopN performance improvements

Re-factor scanAndAggregate in PooledTopN

* Loops are now a little bit tighter when looping over aggregates. This will hopefully assist in loop execution optimization.
* Pre-calculated the aggregate offsets instead of shifting them during runtime.
* Cursor loop could use some TLC, but would require a massive refactoring on how TopN queries are executed.
  * Any potential modifications to query workflow need to account for Stream vs Batch data, and that not all data will be array backed that comes in.

Change data storage type in TopNNumericResultBuilder.

  * Use PriorityQueue to store
  * Checks to see if should even bother adding to Queue before adding.
  * Re-orders Queue on build() call.
  * Ideally the order would be directly preserved on build(), but this is a close second.

Updates to CompressedObjectStrategy to support more compression types

 * Compression types are not yet dynamically configurable.
 * Added a benchmarking system for topN to test the compression
 * Updated pom.xml to include junit benchmarking
 * added an Uncompressed option
This commit is contained in:
Charles Allen 2014-11-04 18:43:55 -08:00 committed by Charles Allen
parent 8b1edfd464
commit ee019872f7
5 changed files with 343 additions and 81 deletions

View File

@ -99,6 +99,12 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>junit-benchmarks</artifactId>
<version>0.7.2</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.easymock</groupId> <groupId>org.easymock</groupId>
<artifactId>easymock</artifactId> <artifactId>easymock</artifactId>
@ -107,6 +113,7 @@
<dependency> <dependency>
<groupId>com.google.caliper</groupId> <groupId>com.google.caliper</groupId>
<artifactId>caliper</artifactId> <artifactId>caliper</artifactId>
<scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -158,29 +158,34 @@ public class PooledTopNAlgorithm
final Cursor cursor = params.getCursor(); final Cursor cursor = params.getCursor();
final DimensionSelector dimSelector = params.getDimSelector(); final DimensionSelector dimSelector = params.getDimSelector();
final int[] aggregatorOffsets = new int[aggregatorSizes.length];
for (int j = 0, offset = 0; j < aggregatorSizes.length; ++j) {
aggregatorOffsets[j] = offset;
offset += aggregatorSizes[j];
}
while (!cursor.isDone()) { while (!cursor.isDone()) {
final IndexedInts dimValues = dimSelector.getRow(); final IndexedInts dimValues = dimSelector.getRow();
for (int i = 0; i < dimValues.size(); ++i) { for (int i = 0; i < dimValues.size(); ++i) {
final int dimIndex = dimValues.get(i); final int dimIndex = dimValues.get(i);
int position = positions[dimIndex]; int position = positions[dimIndex];
switch (position) { if (SKIP_POSITION_VALUE == position) {
case SKIP_POSITION_VALUE: continue;
break;
case INIT_POSITION_VALUE:
positions[dimIndex] = (dimIndex - numProcessed) * numBytesPerRecord;
position = positions[dimIndex];
for (int j = 0; j < theAggregators.length; ++j) {
theAggregators[j].init(resultsBuf, position);
position += aggregatorSizes[j];
}
position = positions[dimIndex];
default:
for (int j = 0; j < theAggregators.length; ++j) {
theAggregators[j].aggregate(resultsBuf, position);
position += aggregatorSizes[j];
}
} }
if (INIT_POSITION_VALUE == position) {
positions[dimIndex] = (dimIndex - numProcessed) * numBytesPerRecord;
position = positions[dimIndex];
for (int j = 0; j < theAggregators.length; ++j) {
theAggregators[j].init(resultsBuf, position + aggregatorOffsets[j]);
}
position = positions[dimIndex];
}
for (int j = 0; j < theAggregators.length; ++j) {
theAggregators[j].aggregate(resultsBuf, position + aggregatorOffsets[j]);
}
} }
cursor.advance(); cursor.advance();

View File

@ -19,8 +19,9 @@
package io.druid.query.topn; package io.druid.query.topn;
import com.google.common.collect.Maps; import com.apple.concurrent.Dispatch;
import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.base.Function;
import com.google.common.collect.*;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.AggregatorUtil;
@ -28,22 +29,25 @@ import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.util.ArrayList; import javax.annotation.Nullable;
import java.util.Comparator; import java.util.*;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/** /**
*
*/ */
public class TopNNumericResultBuilder implements TopNResultBuilder public class TopNNumericResultBuilder implements TopNResultBuilder
{ {
private final DateTime timestamp; private final DateTime timestamp;
private final DimensionSpec dimSpec; private final DimensionSpec dimSpec;
private final String metricName; private final String metricName;
private final List<AggregatorFactory> aggFactories; private final List<AggregatorFactory> aggFactories;
private final List<PostAggregator> postAggs; private final List<PostAggregator> postAggs;
private MinMaxPriorityQueue<DimValHolder> pQueue = null; private final PriorityQueue<DimValHolder> pQueue;
private final Comparator<DimValHolder> dimValComparator;
private final Comparator<String> dimNameComparator;
private final int threshold;
private final Comparator metricComparator;
public TopNNumericResultBuilder( public TopNNumericResultBuilder(
DateTime timestamp, DateTime timestamp,
@ -60,18 +64,50 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
this.metricName = metricName; this.metricName = metricName;
this.aggFactories = aggFactories; this.aggFactories = aggFactories;
this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName); this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName);
this.threshold = threshold;
this.metricComparator = comparator;
this.dimNameComparator = new Comparator<String>()
{
@Override
public int compare(String o1, String o2)
{
int retval;
if (o1 == null) {
retval = -1;
} else if (o2 == null) {
retval = 1;
} else {
retval = o1.compareTo(o2);
}
return retval;
}
};
this.dimValComparator = new Comparator<DimValHolder>()
{
@Override
public int compare(DimValHolder d1, DimValHolder d2)
{
int retVal = metricComparator.compare(d1.getTopNMetricVal(), d2.getTopNMetricVal());
instantiatePQueue(threshold, comparator); if (retVal == 0) {
retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName());
}
return retVal;
}
};
pQueue = new PriorityQueue<>(this.threshold + 1, this.dimValComparator);
} }
@Override @Override
public TopNResultBuilder addEntry( public TopNNumericResultBuilder addEntry(
String dimName, String dimName,
Object dimValIndex, Object dimValIndex,
Object[] metricVals Object[] metricVals
) )
{ {
Map<String, Object> metricValues = Maps.newLinkedHashMap(); final Map<String, Object> metricValues = Maps.newLinkedHashMap();
metricValues.put(dimSpec.getOutputName(), dimName); metricValues.put(dimSpec.getOutputName(), dimName);
@ -85,27 +121,47 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
} }
Object topNMetricVal = metricValues.get(metricName); Object topNMetricVal = metricValues.get(metricName);
pQueue.add(
new DimValHolder.Builder().withTopNMetricVal(topNMetricVal) if (shouldAdd(topNMetricVal)) {
.withDirName(dimName) DimValHolder dimValHolder = new DimValHolder.Builder()
.withDimValIndex(dimValIndex) .withTopNMetricVal(topNMetricVal)
.withMetricValues(metricValues) .withDirName(dimName)
.build() .withDimValIndex(dimValIndex)
); .withMetricValues(metricValues)
.build();
pQueue.add(dimValHolder);
}
if (this.pQueue.size() > this.threshold) {
pQueue.poll();
}
return this; return this;
} }
private boolean shouldAdd(Object topNMetricVal)
{
final boolean belowThreshold = pQueue.size() < this.threshold;
final boolean belowMax = belowThreshold
|| this.metricComparator.compare(pQueue.peek().getTopNMetricVal(), topNMetricVal) < 0;
return belowMax;
}
@Override @Override
public TopNResultBuilder addEntry(DimensionAndMetricValueExtractor dimensionAndMetricValueExtractor) public TopNResultBuilder addEntry(DimensionAndMetricValueExtractor dimensionAndMetricValueExtractor)
{ {
pQueue.add( final Object dimValue = dimensionAndMetricValueExtractor.getDimensionValue(metricName);
new DimValHolder.Builder().withTopNMetricVal(dimensionAndMetricValueExtractor.getDimensionValue(metricName))
.withDirName(dimSpec.getOutputName())
.withMetricValues(dimensionAndMetricValueExtractor.getBaseObject())
.build()
);
if (shouldAdd(dimValue)) {
final DimValHolder valHolder = new DimValHolder.Builder()
.withTopNMetricVal(dimValue)
.withDirName(dimensionAndMetricValueExtractor.getStringDimensionValue(dimSpec.getOutputName()))
.withMetricValues(dimensionAndMetricValueExtractor.getBaseObject())
.build();
pQueue.add(valHolder);
}
if (pQueue.size() > this.threshold) {
pQueue.poll(); // throw away
}
return this; return this;
} }
@ -118,41 +174,40 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
@Override @Override
public Result<TopNResultValue> build() public Result<TopNResultValue> build()
{ {
// Pull out top aggregated values final DimValHolder[] holderValueArray = pQueue.toArray(new DimValHolder[0]);
List<Map<String, Object>> values = new ArrayList<Map<String, Object>>(pQueue.size()); Arrays.sort(
while (!pQueue.isEmpty()) { holderValueArray, new Comparator<DimValHolder>()
values.add(pQueue.remove().getMetricValues());
}
return new Result<TopNResultValue>(
timestamp,
new TopNResultValue(values)
);
}
private void instantiatePQueue(int threshold, final Comparator comparator)
{
this.pQueue = MinMaxPriorityQueue.orderedBy(
new Comparator<DimValHolder>()
{ {
@Override @Override
public int compare(DimValHolder d1, DimValHolder d2) public int compare(DimValHolder d1, DimValHolder d2)
{ {
int retVal = comparator.compare(d2.getTopNMetricVal(), d1.getTopNMetricVal()); int retVal = -metricComparator.compare(d1.getTopNMetricVal(), d2.getTopNMetricVal());
if (retVal == 0) { if (retVal == 0) {
if (d1.getDimName() == null) { retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName());
retVal = -1;
} else if (d2.getDimName() == null) {
retVal = 1;
} else {
retVal = d1.getDimName().compareTo(d2.getDimName());
}
} }
return retVal; return retVal;
} }
} }
).maximumSize(threshold).create(); );
List<DimValHolder> holderValues = Arrays.asList(holderValueArray);
// Pull out top aggregated values
final List<Map<String, Object>> values = Lists.transform(
holderValues,
new Function<DimValHolder, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(DimValHolder valHolder)
{
return valHolder.getMetricValues();
}
}
);
return new Result<TopNResultValue>(
timestamp,
new TopNResultValue(values)
);
} }
} }

View File

@ -49,13 +49,13 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
@Override @Override
public Decompressor getDecompressor() public Decompressor getDecompressor()
{ {
return new LZFDecompressor(); return LZFDecompressor.defaultDecompressor;
} }
@Override @Override
public Compressor getCompressor() public Compressor getCompressor()
{ {
return new LZFCompressor(); return LZFCompressor.defaultCompressor;
} }
}, },
@ -63,15 +63,26 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
@Override @Override
public Decompressor getDecompressor() public Decompressor getDecompressor()
{ {
return new LZ4Decompressor(); return LZ4Decompressor.defaultDecompressor;
} }
@Override @Override
public Compressor getCompressor() public Compressor getCompressor()
{ {
return new LZ4Compressor(); return LZ4Compressor.defaultCompressor;
} }
}; },
UNCOMPRESSED((byte)0x2){
@Override
public Decompressor getDecompressor(){
return UncompressedDecompressor.defaultDecompressor;
}
@Override
public Compressor getCompressor(){
return UncompressedCompressor.defaultCompressor;
}
}
;
final byte id; final byte id;
@ -120,9 +131,35 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
*/ */
public byte[] compress(byte[] bytes); public byte[] compress(byte[] bytes);
} }
public static class UncompressedCompressor implements Compressor{
private static final UncompressedCompressor defaultCompressor = new UncompressedCompressor();
@Override
public byte[] compress(byte[] bytes) {
return bytes;
}
}
public static class UncompressedDecompressor implements Decompressor{
private static final UncompressedDecompressor defaultDecompressor = new UncompressedDecompressor();
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) {
final int maxCopy = Math.min(numBytes, out.remaining());
final ByteBuffer copyBuffer = in.duplicate();
copyBuffer.limit(copyBuffer.position() + maxCopy);
out.put(copyBuffer);
// Setup the buffers properly
out.flip();
in.position(in.position() + maxCopy);
}
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize) {
decompress(in, numBytes, out);
}
}
public static class LZFDecompressor implements Decompressor public static class LZFDecompressor implements Decompressor
{ {
private static final LZFDecompressor defaultDecompressor = new LZFDecompressor();
@Override @Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
{ {
@ -149,6 +186,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
public static class LZFCompressor implements Compressor public static class LZFCompressor implements Compressor
{ {
private static final LZFCompressor defaultCompressor = new LZFCompressor();
@Override @Override
public byte[] compress(byte[] bytes) public byte[] compress(byte[] bytes)
{ {
@ -162,9 +200,9 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
public static class LZ4Decompressor implements Decompressor public static class LZ4Decompressor implements Decompressor
{ {
private final LZ4SafeDecompressor lz4 = LZ4Factory.fastestJavaInstance().safeDecompressor(); private static final LZ4SafeDecompressor lz4Safe = LZ4Factory.fastestJavaInstance().safeDecompressor();
private final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor(); private static final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor();
private static final LZ4Decompressor defaultDecompressor = new LZ4Decompressor();
@Override @Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
{ {
@ -173,8 +211,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) { try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get(); final byte[] outputBytes = outputBytesHolder.get();
final int numDecompressedBytes = lz4.decompress(bytes, 0, bytes.length, outputBytes, 0, outputBytes.length); final int numDecompressedBytes = lz4Fast.decompress(bytes, 0, outputBytes, 0, outputBytes.length);
out.put(outputBytes, 0, numDecompressedBytes); out.put(outputBytes, 0, numDecompressedBytes);
out.flip(); out.flip();
} }
@ -189,6 +226,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
final byte[] bytes = new byte[numBytes]; final byte[] bytes = new byte[numBytes];
in.get(bytes); in.get(bytes);
// TODO: Upgrade this to ByteBuffer once https://github.com/jpountz/lz4-java/issues/9 is in mainline code for lz4-java
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) { try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get(); final byte[] outputBytes = outputBytesHolder.get();
lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize); lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize);
@ -204,16 +242,14 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
public static class LZ4Compressor implements Compressor public static class LZ4Compressor implements Compressor
{ {
private final net.jpountz.lz4.LZ4Compressor lz4 = LZ4Factory.fastestJavaInstance().highCompressor(); private static final LZ4Compressor defaultCompressor = new LZ4Compressor();
private static final net.jpountz.lz4.LZ4Compressor lz4Fast = LZ4Factory.fastestJavaInstance().fastCompressor();
private static final net.jpountz.lz4.LZ4Compressor lz4High = LZ4Factory.fastestJavaInstance().highCompressor();
@Override @Override
public byte[] compress(byte[] bytes) public byte[] compress(byte[] bytes)
{ {
final byte[] intermediate = new byte[lz4.maxCompressedLength(bytes.length)]; return lz4High.compress(bytes);
final int outputBytes = lz4.compress(bytes, 0, bytes.length, intermediate, 0, intermediate.length);
final byte[] out = new byte[outputBytes];
System.arraycopy(intermediate, 0, out, 0, outputBytes);
return out;
} }
} }

View File

@ -0,0 +1,159 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.carrotsearch.junitbenchmarks.AbstractBenchmark;
import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.collections.StupidPool;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MaxAggregatorFactory;
import io.druid.query.aggregation.MinAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.TestIndex;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/**
* Based on TopNQueryRunnerTest
*/
//@Ignore // Don't need to actually run the benchmark every time
public class TopNQueryRunnerBenchmark extends AbstractBenchmark
{
public static enum TestCases
{
rtIndex, mMappedTestIndex, mergedRealtimeIndex, rtIndexOffheap
}
private static final String marketDimension = "market";
private static final String segmentId = "testSegment";
private static final HashMap<String, Object> context = new HashMap<String, Object>();
private static final TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(marketDimension)
.metric(QueryRunnerTestHelper.indexMetric)
.threshold(4)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new MaxAggregatorFactory("maxIndex", "index"),
new MinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
private static final Map<TestCases, QueryRunner> testCaseMap = Maps.newHashMap();
@BeforeClass
public static void setUp() throws Exception
{
QueryRunnerFactory factory = new TopNQueryRunnerFactory(
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(2000);
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
testCaseMap.put(
TestCases.rtIndex,
QueryRunnerTestHelper.makeQueryRunner(
factory,
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId)
)
);
testCaseMap.put(
TestCases.mMappedTestIndex,
QueryRunnerTestHelper.makeQueryRunner(
factory,
new QueryableIndexSegment(segmentId, TestIndex.getMMappedTestIndex())
)
);
testCaseMap.put(
TestCases.mergedRealtimeIndex,
QueryRunnerTestHelper.makeQueryRunner(
factory,
new QueryableIndexSegment(segmentId, TestIndex.mergedRealtimeIndex())
)
);
testCaseMap.put(
TestCases.rtIndexOffheap,
QueryRunnerTestHelper.makeQueryRunner(
factory,
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(true), segmentId)
)
);
//Thread.sleep(10000);
}
@BenchmarkOptions(warmupRounds = 10000, benchmarkRounds = 10000)
@Test
public void testmMapped()
{
testCaseMap.get(TestCases.mMappedTestIndex).run(query, context);
}
/**
* These are not important
@BenchmarkOptions(warmupRounds = 10000,benchmarkRounds = 10000)
@Test public void testrtIndex(){
testCaseMap.get(TestCases.rtIndex).run(query,context);
}
@BenchmarkOptions(warmupRounds = 10000, benchmarkRounds = 10000)
@Test public void testMerged(){
testCaseMap.get(TestCases.mergedRealtimeIndex).run(query,context);
}
@BenchmarkOptions(warmupRounds = 10000, benchmarkRounds = 10000)
@Test public void testOffHeap(){
testCaseMap.get(TestCases.rtIndexOffheap).run(query,context);
}
*/
}