Merge remote-tracking branch 'origin/druid-0.7.x-fastTopN-rebase' into druid-0.7.x-fastTopN-rebase

This commit is contained in:
Charles Allen 2014-11-10 12:52:36 -08:00
commit bfc9d9f283
5 changed files with 67 additions and 81 deletions

View File

@ -468,6 +468,12 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>junit-benchmarks</artifactId>
<version>0.7.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.caliper</groupId>
<artifactId>caliper</artifactId>

View File

@ -102,7 +102,6 @@
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>junit-benchmarks</artifactId>
<version>0.7.2</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@ -164,6 +164,10 @@ public class PooledTopNAlgorithm
offset += aggregatorSizes[j];
}
final int nAggregators = theAggregators.length;
final int extra = nAggregators - (nAggregators % 4) - 1;
final int ub = (nAggregators / 4) * 4;
while (!cursor.isDone()) {
final IndexedInts dimValues = dimSelector.getRow();
@ -177,18 +181,21 @@ public class PooledTopNAlgorithm
if (INIT_POSITION_VALUE == position) {
positions[dimIndex] = (dimIndex - numProcessed) * numBytesPerRecord;
position = positions[dimIndex];
for (int j = 0; j < theAggregators.length; ++j) {
for (int j = 0; j < nAggregators; ++j) {
theAggregators[j].init(resultsBuf, position + aggregatorOffsets[j]);
}
position = positions[dimIndex];
}
for (int j = 0; j < theAggregators.length; ++j) {
for (int j = 0; j < ub; j += 4) {
theAggregators[j].aggregate(resultsBuf, position + aggregatorOffsets[j]);
theAggregators[j+1].aggregate(resultsBuf, position + aggregatorOffsets[j+1]);
theAggregators[j+2].aggregate(resultsBuf, position + aggregatorOffsets[j+2]);
theAggregators[j+3].aggregate(resultsBuf, position + aggregatorOffsets[j+3]);
}
for(int j = extra; j < nAggregators; ++j) {
theAggregators[j].aggregate(resultsBuf, position + aggregatorOffsets[j]);
}
}
cursor.advance();
}
}

View File

@ -19,9 +19,8 @@
package io.druid.query.topn;
import com.apple.concurrent.Dispatch;
import com.google.common.base.Function;
import com.google.common.collect.*;
import com.google.common.collect.Lists;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
@ -29,8 +28,14 @@ import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.*;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
/**
*
@ -45,7 +50,26 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
private final List<PostAggregator> postAggs;
private final PriorityQueue<DimValHolder> pQueue;
private final Comparator<DimValHolder> dimValComparator;
private final Comparator<String> dimNameComparator;
private static final Comparator<String> dimNameComparator = new Comparator<String>()
{
@Override
public int compare(String o1, String o2)
{
int retval;
if (null == o1) {
if(null == o2){
retval = 0;
}else {
retval = -1;
}
} else if (null == o2) {
retval = 1;
} else {
retval = o1.compareTo(o2);
}
return retval;
}
};
private final int threshold;
private final Comparator metricComparator;
@ -66,22 +90,6 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName);
this.threshold = threshold;
this.metricComparator = comparator;
this.dimNameComparator = new Comparator<String>()
{
@Override
public int compare(String o1, String o2)
{
int retval;
if (o1 == null) {
retval = -1;
} else if (o2 == null) {
retval = 1;
} else {
retval = o1.compareTo(o2);
}
return retval;
}
};
this.dimValComparator = new Comparator<DimValHolder>()
{
@Override
@ -97,6 +105,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
}
};
// The logic in addEntry first adds, then removes if needed. So it can at any point have up to threshold + 1 entries.
pQueue = new PriorityQueue<>(this.threshold + 1, this.dimValComparator);
}
@ -107,7 +116,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
Object[] metricVals
)
{
final Map<String, Object> metricValues = Maps.newLinkedHashMap();
final Map<String, Object> metricValues = new LinkedHashMap<>(metricVals.length + postAggs.size());
metricValues.put(dimSpec.getOutputName(), dimName);
@ -181,7 +190,8 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
@Override
public int compare(DimValHolder d1, DimValHolder d2)
{
int retVal = -metricComparator.compare(d1.getTopNMetricVal(), d2.getTopNMetricVal());
// Values flipped compared to earlier
int retVal = metricComparator.compare(d2.getTopNMetricVal(), d1.getTopNMetricVal());
if (retVal == 0) {
retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName());

View File

@ -49,13 +49,13 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
@Override
public Decompressor getDecompressor()
{
return LZFDecompressor.defaultDecompressor;
return new LZFDecompressor();
}
@Override
public Compressor getCompressor()
{
return LZFCompressor.defaultCompressor;
return new LZFCompressor();
}
},
@ -63,26 +63,15 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
@Override
public Decompressor getDecompressor()
{
return LZ4Decompressor.defaultDecompressor;
return new LZ4Decompressor();
}
@Override
public Compressor getCompressor()
{
return LZ4Compressor.defaultCompressor;
return new LZ4Compressor();
}
},
UNCOMPRESSED((byte)0x2){
@Override
public Decompressor getDecompressor(){
return UncompressedDecompressor.defaultDecompressor;
}
@Override
public Compressor getCompressor(){
return UncompressedCompressor.defaultCompressor;
}
}
;
};
final byte id;
@ -131,35 +120,9 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
*/
public byte[] compress(byte[] bytes);
}
public static class UncompressedCompressor implements Compressor{
private static final UncompressedCompressor defaultCompressor = new UncompressedCompressor();
@Override
public byte[] compress(byte[] bytes) {
return bytes;
}
}
public static class UncompressedDecompressor implements Decompressor{
private static final UncompressedDecompressor defaultDecompressor = new UncompressedDecompressor();
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) {
final int maxCopy = Math.min(numBytes, out.remaining());
final ByteBuffer copyBuffer = in.duplicate();
copyBuffer.limit(copyBuffer.position() + maxCopy);
out.put(copyBuffer);
// Setup the buffers properly
out.flip();
in.position(in.position() + maxCopy);
}
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize) {
decompress(in, numBytes, out);
}
}
public static class LZFDecompressor implements Decompressor
{
private static final LZFDecompressor defaultDecompressor = new LZFDecompressor();
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
{
@ -186,7 +149,6 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
public static class LZFCompressor implements Compressor
{
private static final LZFCompressor defaultCompressor = new LZFCompressor();
@Override
public byte[] compress(byte[] bytes)
{
@ -200,9 +162,9 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
public static class LZ4Decompressor implements Decompressor
{
private static final LZ4SafeDecompressor lz4Safe = LZ4Factory.fastestJavaInstance().safeDecompressor();
private static final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor();
private static final LZ4Decompressor defaultDecompressor = new LZ4Decompressor();
private final LZ4SafeDecompressor lz4 = LZ4Factory.fastestJavaInstance().safeDecompressor();
private final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor();
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
{
@ -211,7 +173,8 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get();
final int numDecompressedBytes = lz4Fast.decompress(bytes, 0, outputBytes, 0, outputBytes.length);
final int numDecompressedBytes = lz4.decompress(bytes, 0, bytes.length, outputBytes, 0, outputBytes.length);
out.put(outputBytes, 0, numDecompressedBytes);
out.flip();
}
@ -226,7 +189,6 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
final byte[] bytes = new byte[numBytes];
in.get(bytes);
// TODO: Upgrade this to ByteBuffer once https://github.com/jpountz/lz4-java/issues/9 is in mainline code for lz4-java
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get();
lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize);
@ -242,14 +204,16 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
public static class LZ4Compressor implements Compressor
{
private static final LZ4Compressor defaultCompressor = new LZ4Compressor();
private static final net.jpountz.lz4.LZ4Compressor lz4Fast = LZ4Factory.fastestJavaInstance().fastCompressor();
private static final net.jpountz.lz4.LZ4Compressor lz4High = LZ4Factory.fastestJavaInstance().highCompressor();
private final net.jpountz.lz4.LZ4Compressor lz4 = LZ4Factory.fastestJavaInstance().highCompressor();
@Override
public byte[] compress(byte[] bytes)
{
return lz4High.compress(bytes);
final byte[] intermediate = new byte[lz4.maxCompressedLength(bytes.length)];
final int outputBytes = lz4.compress(bytes, 0, bytes.length, intermediate, 0, intermediate.length);
final byte[] out = new byte[outputBytes];
System.arraycopy(intermediate, 0, out, 0, outputBytes);
return out;
}
}