From 956bdf80dd433075b0e865e5d3e1b53a47897688 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 7 Nov 2014 10:29:06 -0800 Subject: [PATCH 1/9] Removed negating comparator in TopNNumericResultBuilder --- .../java/io/druid/query/topn/TopNNumericResultBuilder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java index 62ddf139eae..e3d66c49955 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java @@ -181,7 +181,8 @@ public class TopNNumericResultBuilder implements TopNResultBuilder @Override public int compare(DimValHolder d1, DimValHolder d2) { - int retVal = -metricComparator.compare(d1.getTopNMetricVal(), d2.getTopNMetricVal()); + // Values flipped compared to earlier + int retVal = metricComparator.compare(d2.getTopNMetricVal(),d1.getTopNMetricVal()); if (retVal == 0) { retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName()); From 228614ddb50cb832cec42884a1160ebfd23fe4df Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 7 Nov 2014 10:34:28 -0800 Subject: [PATCH 2/9] Changed TopNNumericResultBuilder imports to not use package.*, but instead pulls all in explicitly --- .../query/topn/TopNNumericResultBuilder.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java index e3d66c49955..71d5dfcbaf9 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java @@ -19,9 +19,9 @@ package io.druid.query.topn; -import com.apple.concurrent.Dispatch; import com.google.common.base.Function; -import com.google.common.collect.*; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorUtil; @@ -29,8 +29,13 @@ import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; import org.joda.time.DateTime; -import javax.annotation.Nullable; -import java.util.*; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; + /** * @@ -182,7 +187,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder public int compare(DimValHolder d1, DimValHolder d2) { // Values flipped compared to earlier - int retVal = metricComparator.compare(d2.getTopNMetricVal(),d1.getTopNMetricVal()); + int retVal = metricComparator.compare(d2.getTopNMetricVal(), d1.getTopNMetricVal()); if (retVal == 0) { retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName()); From 0a562ebd771bc5f27e1aa6608e694a652869a34b Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 7 Nov 2014 10:42:28 -0800 Subject: [PATCH 3/9] Removed CompressedObjectStrategy from this pull request. will submit a new one later --- .../data/CompressedObjectStrategy.java | 68 +++++-------------- 1 file changed, 16 insertions(+), 52 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java index 0ea2d86cda9..0796b60f66b 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java @@ -49,13 +49,13 @@ public class CompressedObjectStrategy implements ObjectStrateg @Override public Decompressor getDecompressor() { - return LZFDecompressor.defaultDecompressor; + return new LZFDecompressor(); } @Override public Compressor getCompressor() { - return LZFCompressor.defaultCompressor; + return new LZFCompressor(); } }, @@ -63,26 +63,15 @@ public class CompressedObjectStrategy implements ObjectStrateg @Override public Decompressor getDecompressor() { - return LZ4Decompressor.defaultDecompressor; + return new LZ4Decompressor(); } @Override public Compressor getCompressor() { - return LZ4Compressor.defaultCompressor; + return new LZ4Compressor(); } - }, - UNCOMPRESSED((byte)0x2){ - @Override - public Decompressor getDecompressor(){ - return UncompressedDecompressor.defaultDecompressor; - } - @Override - public Compressor getCompressor(){ - return UncompressedCompressor.defaultCompressor; - } - } - ; + }; final byte id; @@ -131,35 +120,9 @@ public class CompressedObjectStrategy implements ObjectStrateg */ public byte[] compress(byte[] bytes); } - public static class UncompressedCompressor implements Compressor{ - private static final UncompressedCompressor defaultCompressor = new UncompressedCompressor(); - @Override - public byte[] compress(byte[] bytes) { - return bytes; - } - } - public static class UncompressedDecompressor implements Decompressor{ - private static final UncompressedDecompressor defaultDecompressor = new UncompressedDecompressor(); - @Override - public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) { - final int maxCopy = Math.min(numBytes, out.remaining()); - final ByteBuffer copyBuffer = in.duplicate(); - copyBuffer.limit(copyBuffer.position() + maxCopy); - out.put(copyBuffer); - - // Setup the buffers properly - out.flip(); - in.position(in.position() + maxCopy); - } - @Override - public void decompress(ByteBuffer in, int numBytes, ByteBuffer out, int decompressedSize) { - decompress(in, numBytes, out); - } - } public static class LZFDecompressor implements Decompressor { - private static final LZFDecompressor defaultDecompressor = new LZFDecompressor(); @Override public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) { @@ -186,7 +149,6 @@ public class CompressedObjectStrategy implements ObjectStrateg public static class LZFCompressor implements Compressor { - private static final LZFCompressor defaultCompressor = new LZFCompressor(); @Override public byte[] compress(byte[] bytes) { @@ -200,9 +162,9 @@ public class CompressedObjectStrategy implements ObjectStrateg public static class LZ4Decompressor implements Decompressor { - private static final LZ4SafeDecompressor lz4Safe = LZ4Factory.fastestJavaInstance().safeDecompressor(); - private static final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor(); - private static final LZ4Decompressor defaultDecompressor = new LZ4Decompressor(); + private final LZ4SafeDecompressor lz4 = LZ4Factory.fastestJavaInstance().safeDecompressor(); + private final LZ4FastDecompressor lz4Fast = LZ4Factory.fastestJavaInstance().fastDecompressor(); + @Override public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) { @@ -211,7 +173,8 @@ public class CompressedObjectStrategy implements ObjectStrateg try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) { final byte[] outputBytes = outputBytesHolder.get(); - final int numDecompressedBytes = lz4Fast.decompress(bytes, 0, outputBytes, 0, outputBytes.length); + final int numDecompressedBytes = lz4.decompress(bytes, 0, bytes.length, outputBytes, 0, outputBytes.length); + out.put(outputBytes, 0, numDecompressedBytes); out.flip(); } @@ -226,7 +189,6 @@ public class CompressedObjectStrategy implements ObjectStrateg final byte[] bytes = new byte[numBytes]; in.get(bytes); - // TODO: Upgrade this to ByteBuffer once https://github.com/jpountz/lz4-java/issues/9 is in mainline code for lz4-java try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) { final byte[] outputBytes = outputBytesHolder.get(); lz4Fast.decompress(bytes, 0, outputBytes, 0, decompressedSize); @@ -242,14 +204,16 @@ public class CompressedObjectStrategy implements ObjectStrateg public static class LZ4Compressor implements Compressor { - private static final LZ4Compressor defaultCompressor = new LZ4Compressor(); - private static final net.jpountz.lz4.LZ4Compressor lz4Fast = LZ4Factory.fastestJavaInstance().fastCompressor(); - private static final net.jpountz.lz4.LZ4Compressor lz4High = LZ4Factory.fastestJavaInstance().highCompressor(); + private final net.jpountz.lz4.LZ4Compressor lz4 = LZ4Factory.fastestJavaInstance().highCompressor(); @Override public byte[] compress(byte[] bytes) { - return lz4High.compress(bytes); + final byte[] intermediate = new byte[lz4.maxCompressedLength(bytes.length)]; + final int outputBytes = lz4.compress(bytes, 0, bytes.length, intermediate, 0, intermediate.length); + final byte[] out = new byte[outputBytes]; + System.arraycopy(intermediate, 0, out, 0, outputBytes); + return out; } } From bc92de233c1d04e2427e56393f5597a26dc3654a Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 7 Nov 2014 10:54:10 -0800 Subject: [PATCH 4/9] Update dim name comparator in TopNNumericResultBuilder to better handle nulls --- .../query/topn/TopNNumericResultBuilder.java | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java index 71d5dfcbaf9..3c4a4b0db76 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java @@ -50,7 +50,26 @@ public class TopNNumericResultBuilder implements TopNResultBuilder private final List postAggs; private final PriorityQueue pQueue; private final Comparator dimValComparator; - private final Comparator dimNameComparator; + private static final Comparator dimNameComparator = new Comparator() + { + @Override + public int compare(String o1, String o2) + { + int retval; + if (null == o1) { + if(null == o2){ + retval = 0; + }else { + retval = -1; + } + } else if (null == o2) { + retval = 1; + } else { + retval = o1.compareTo(o2); + } + return retval; + } + }; private final int threshold; private final Comparator metricComparator; @@ -71,22 +90,6 @@ public class TopNNumericResultBuilder implements TopNResultBuilder this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName); this.threshold = threshold; this.metricComparator = comparator; - this.dimNameComparator = new Comparator() - { - @Override - public int compare(String o1, String o2) - { - int retval; - if (o1 == null) { - retval = -1; - } else if (o2 == null) { - retval = 1; - } else { - retval = o1.compareTo(o2); - } - return retval; - } - }; this.dimValComparator = new Comparator() { @Override From b4893b4490f1a6795abec5fd1e3431e5b11cba33 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 7 Nov 2014 10:56:49 -0800 Subject: [PATCH 5/9] Pre allocate LinkedHashMap in TopNNumericResultBuilder --- .../java/io/druid/query/topn/TopNNumericResultBuilder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java index 3c4a4b0db76..ab26392b809 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java @@ -21,7 +21,6 @@ package io.druid.query.topn; import com.google.common.base.Function; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorUtil; @@ -32,6 +31,7 @@ import org.joda.time.DateTime; import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.PriorityQueue; @@ -115,7 +115,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder Object[] metricVals ) { - final Map metricValues = Maps.newLinkedHashMap(); + final Map metricValues = new LinkedHashMap<>(metricVals.length + postAggs.size()); metricValues.put(dimSpec.getOutputName(), dimName); From 75c79e75627bef9a2b1228a56d1cd05fd67f57a4 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 7 Nov 2014 11:01:46 -0800 Subject: [PATCH 6/9] Added comments in TopNNumericResultBuilder --- .../main/java/io/druid/query/topn/TopNNumericResultBuilder.java | 1 + 1 file changed, 1 insertion(+) diff --git a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java index ab26392b809..9c289ed3496 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java @@ -105,6 +105,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder } }; + // The logic in addEntry first adds, then removes if needed. So it can at any point have up to threshold + 1 entries. pQueue = new PriorityQueue<>(this.threshold + 1, this.dimValComparator); } From 6af18931e52de7305fa65cfc5d404c738023eea2 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 7 Nov 2014 11:08:23 -0800 Subject: [PATCH 7/9] Moveed processing/pom version into master pom --- pom.xml | 6 ++++++ processing/pom.xml | 1 - 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 1ca0ace022d..194feb4d810 100644 --- a/pom.xml +++ b/pom.xml @@ -461,6 +461,12 @@ 4.11 test + + com.carrotsearch + junit-benchmarks + 0.7.2 + test + com.google.caliper caliper diff --git a/processing/pom.xml b/processing/pom.xml index 3fc7fd6242e..dd61b3d55f3 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -103,7 +103,6 @@ com.carrotsearch junit-benchmarks 0.7.2 - test org.easymock From f5f48c77566f68b8931b2fd25c2877853a912131 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 7 Nov 2014 11:09:57 -0800 Subject: [PATCH 8/9] Moved pom version for reals this time --- processing/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/pom.xml b/processing/pom.xml index dd61b3d55f3..5455735499a 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -102,7 +102,7 @@ com.carrotsearch junit-benchmarks - 0.7.2 + test org.easymock From 17d32a256122230129d65e7831c00b0056c2b738 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 10 Nov 2014 12:49:54 -0800 Subject: [PATCH 9/9] try to benefit from out of order execution --- .../druid/query/topn/PooledTopNAlgorithm.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java index 6f0c527fd1b..3a5af0a9146 100644 --- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java @@ -164,10 +164,15 @@ public class PooledTopNAlgorithm offset += aggregatorSizes[j]; } + final int nAggregators = theAggregators.length; + final int extra = nAggregators - (nAggregators % 4) - 1; + final int ub = (nAggregators / 4) * 4; + while (!cursor.isDone()) { final IndexedInts dimValues = dimSelector.getRow(); - for (int i = 0; i < dimValues.size(); ++i) { + final int size = dimValues.size(); + for (int i = 0; i < size; ++i) { final int dimIndex = dimValues.get(i); int position = positions[dimIndex]; if (SKIP_POSITION_VALUE == position) { @@ -176,18 +181,21 @@ public class PooledTopNAlgorithm if (INIT_POSITION_VALUE == position) { positions[dimIndex] = (dimIndex - numProcessed) * numBytesPerRecord; position = positions[dimIndex]; - for (int j = 0; j < theAggregators.length; ++j) { + for (int j = 0; j < nAggregators; ++j) { theAggregators[j].init(resultsBuf, position + aggregatorOffsets[j]); } position = positions[dimIndex]; } - for (int j = 0; j < theAggregators.length; ++j) { + for (int j = 0; j < ub; j += 4) { + theAggregators[j].aggregate(resultsBuf, position + aggregatorOffsets[j]); + theAggregators[j+1].aggregate(resultsBuf, position + aggregatorOffsets[j+1]); + theAggregators[j+2].aggregate(resultsBuf, position + aggregatorOffsets[j+2]); + theAggregators[j+3].aggregate(resultsBuf, position + aggregatorOffsets[j+3]); + } + for(int j = extra; j < nAggregators; ++j) { theAggregators[j].aggregate(resultsBuf, position + aggregatorOffsets[j]); } - - } - cursor.advance(); } }