diff --git a/pom.xml b/pom.xml index dba0c8c2207..fdc9acecf17 100644 --- a/pom.xml +++ b/pom.xml @@ -468,6 +468,12 @@ 4.11 test + + com.carrotsearch + junit-benchmarks + 0.7.2 + test + com.google.caliper caliper diff --git a/processing/pom.xml b/processing/pom.xml index 3fc7fd6242e..5455735499a 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -102,7 +102,6 @@ com.carrotsearch junit-benchmarks - 0.7.2 test diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java index 7291edfe7ce..a18e34516d1 100644 --- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java @@ -164,6 +164,10 @@ public class PooledTopNAlgorithm offset += aggregatorSizes[j]; } + final int nAggregators = theAggregators.length; + final int extra = nAggregators - (nAggregators % 4) - 1; + final int ub = (nAggregators / 4) * 4; + while (!cursor.isDone()) { final IndexedInts dimValues = dimSelector.getRow(); @@ -177,18 +181,21 @@ public class PooledTopNAlgorithm if (INIT_POSITION_VALUE == position) { positions[dimIndex] = (dimIndex - numProcessed) * numBytesPerRecord; position = positions[dimIndex]; - for (int j = 0; j < theAggregators.length; ++j) { + for (int j = 0; j < nAggregators; ++j) { theAggregators[j].init(resultsBuf, position + aggregatorOffsets[j]); } position = positions[dimIndex]; } - for (int j = 0; j < theAggregators.length; ++j) { + for (int j = 0; j < ub; j += 4) { + theAggregators[j].aggregate(resultsBuf, position + aggregatorOffsets[j]); + theAggregators[j+1].aggregate(resultsBuf, position + aggregatorOffsets[j+1]); + theAggregators[j+2].aggregate(resultsBuf, position + aggregatorOffsets[j+2]); + theAggregators[j+3].aggregate(resultsBuf, position + aggregatorOffsets[j+3]); + } + for(int j = extra; j < nAggregators; ++j) { theAggregators[j].aggregate(resultsBuf, position + aggregatorOffsets[j]); } - - } - cursor.advance(); } } diff --git a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java index 62ddf139eae..9c289ed3496 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java @@ -19,9 +19,8 @@ package io.druid.query.topn; -import com.apple.concurrent.Dispatch; import com.google.common.base.Function; -import com.google.common.collect.*; +import com.google.common.collect.Lists; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorUtil; @@ -29,8 +28,14 @@ import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; import org.joda.time.DateTime; -import javax.annotation.Nullable; -import java.util.*; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; + /** * @@ -45,7 +50,26 @@ public class TopNNumericResultBuilder implements TopNResultBuilder private final List postAggs; private final PriorityQueue pQueue; private final Comparator dimValComparator; - private final Comparator dimNameComparator; + private static final Comparator dimNameComparator = new Comparator() + { + @Override + public int compare(String o1, String o2) + { + int retval; + if (null == o1) { + if(null == o2){ + retval = 0; + }else { + retval = -1; + } + } else if (null == o2) { + retval = 1; + } else { + retval = o1.compareTo(o2); + } + return retval; + } + }; private final int threshold; private final Comparator metricComparator; @@ -66,22 +90,6 @@ public class TopNNumericResultBuilder implements TopNResultBuilder this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName); this.threshold = threshold; this.metricComparator = comparator; - this.dimNameComparator = new Comparator() - { - @Override - public int compare(String o1, String o2) - { - int retval; - if (o1 == null) { - retval = -1; - } else if (o2 == null) { - retval = 1; - } else { - retval = o1.compareTo(o2); - } - return retval; - } - }; this.dimValComparator = new Comparator() { @Override @@ -97,6 +105,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder } }; + // The logic in addEntry first adds, then removes if needed. So it can at any point have up to threshold + 1 entries. pQueue = new PriorityQueue<>(this.threshold + 1, this.dimValComparator); } @@ -107,7 +116,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder Object[] metricVals ) { - final Map metricValues = Maps.newLinkedHashMap(); + final Map metricValues = new LinkedHashMap<>(metricVals.length + postAggs.size()); metricValues.put(dimSpec.getOutputName(), dimName); @@ -181,7 +190,8 @@ public class TopNNumericResultBuilder implements TopNResultBuilder @Override public int compare(DimValHolder d1, DimValHolder d2) { - int retVal = -metricComparator.compare(d1.getTopNMetricVal(), d2.getTopNMetricVal()); + // Values flipped compared to earlier + int retVal = metricComparator.compare(d2.getTopNMetricVal(), d1.getTopNMetricVal()); if (retVal == 0) { retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName()); diff --git a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java index 0ea2d86cda9..0796b60f66b 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java @@ -49,13 +49,13 @@ public class CompressedObjectStrategy implements ObjectStrateg @Override public Decompressor getDecompressor() { - return LZFDecompressor.defaultDecompressor; + return new LZFDecompressor(); } @Override public Compressor getCompressor() { - return LZFCompressor.defaultCompressor; + return new LZFCompressor(); } }, @@ -63,26 +63,15 @@ public class CompressedObjectStrategy implements ObjectStrateg @Override public Decompressor getDecompressor() { - return LZ4Decompressor.defaultDecompressor; + return new LZ4Decompressor(); } @Override public Compressor getCompressor() { - return LZ4Compressor.defaultCompressor; + return new LZ4Compressor(); } - }, - UNCOMPRESSED((byte)0x2){ - @Override - public Decompressor getDecompressor(){ - return UncompressedDecompressor.defaultDecompressor; - } - @Override - public Compressor getCompressor(){ - return UncompressedCompressor.defaultCompressor; - } - } - ; + }; final byte id; @@ -131,35 +120,9 @@ public class CompressedObjectStrategy implements ObjectStrateg */ public byte[] compress(byte[] bytes); } - public static class UncompressedCompressor implements Compressor{ - private static final UncompressedCompressor defaultCompressor = new UncompressedCompressor(); - @Override - public byte[] compress(byte[] bytes) { - return bytes; - } - } - public static class UncompressedDecompressor implements Decompressor{ - private static final UncompressedDecompressor defaultDecompressor = new UncompressedDecompressor(); - @Override - public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) { - final int maxCopy = Math.min(numBytes, out.remaining()); - final ByteBuffer copyBuffer = in.duplicate(); - copyBuffer.limit(copyBuffer.position() + maxCopy); - out.put(copyBuffer); - - // Setup the buffers properly - out.flip(); - in.position(in.position() + maxCopy); - } - @Override - public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize) { - decompress(in, numBytes, out); - } - } public static class LZFDecompressor implements Decompressor { - private static final LZFDecompressor defaultDecompressor = new LZFDecompressor(); @Override public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) { @@ -186,7 +149,6 @@ public class CompressedObjectStrategy implements ObjectStrateg public static class LZFCompressor implements Compressor { - private static final LZFCompressor defaultCompressor = new LZFCompressor(); @Override public byte[] compress(byte[] bytes) { @@ -200,9 +162,9 @@ public class CompressedObjectStrategy implements ObjectStrateg public static class LZ4Decompressor implements Decompressor { - private static final LZ4SafeDecompressor lz4Safe = LZ4Factory.fastestJavaInstance().safeDecompressor(); - private static final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor(); - private static final LZ4Decompressor defaultDecompressor = new LZ4Decompressor(); + private final LZ4SafeDecompressor lz4 = LZ4Factory.fastestJavaInstance().safeDecompressor(); + private final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor(); + @Override public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) { @@ -211,7 +173,8 @@ public class CompressedObjectStrategy implements ObjectStrateg try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) { final byte[] outputBytes = outputBytesHolder.get(); - final int numDecompressedBytes = lz4Fast.decompress(bytes, 0, outputBytes, 0, outputBytes.length); + final int numDecompressedBytes = lz4.decompress(bytes, 0, bytes.length, outputBytes, 0, outputBytes.length); + out.put(outputBytes, 0, numDecompressedBytes); out.flip(); } @@ -226,7 +189,6 @@ public class CompressedObjectStrategy implements ObjectStrateg final byte[] bytes = new byte[numBytes]; in.get(bytes); - // TODO: Upgrade this to ByteBuffer once https://github.com/jpountz/lz4-java/issues/9 is in mainline code for lz4-java try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) { final byte[] outputBytes = outputBytesHolder.get(); lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize); @@ -242,14 +204,16 @@ public class CompressedObjectStrategy implements ObjectStrateg public static class LZ4Compressor implements Compressor { - private static final LZ4Compressor defaultCompressor = new LZ4Compressor(); - private static final net.jpountz.lz4.LZ4Compressor lz4Fast = LZ4Factory.fastestJavaInstance().fastCompressor(); - private static final net.jpountz.lz4.LZ4Compressor lz4High = LZ4Factory.fastestJavaInstance().highCompressor(); + private final net.jpountz.lz4.LZ4Compressor lz4 = LZ4Factory.fastestJavaInstance().highCompressor(); @Override public byte[] compress(byte[] bytes) { - return lz4High.compress(bytes); + final byte[] intermediate = new byte[lz4.maxCompressedLength(bytes.length)]; + final int outputBytes = lz4.compress(bytes, 0, bytes.length, intermediate, 0, intermediate.length); + final byte[] out = new byte[outputBytes]; + System.arraycopy(intermediate, 0, out, 0, outputBytes); + return out; } }