From 70e31082827772fc39cbecc713f9bbd0d4de1647 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 18 Nov 2014 22:07:18 -0800 Subject: [PATCH] Multiple speed improvements revolving around topN with HLL Change serializer / deserializer for HyperLogLog * Changed DirectDruidClient's InputStream handling. Is now ~10% faster for data heavy queries, and has lower variance in execution speed. * Changed HLL Collector's toByteStream() method to be better optimized for small values. Is notably faster for small result quantities which fall into the sparse HLL bucket codepath. * No change for dense HLL which just uses a direct bytestream of the underlying byte data. TopNNumericResultBuilder semi-aggressive loop unrolling for metricVals Benchmark for HLL for sparse packing (small HLL bucket population): HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[0]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)] round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 216, GC.time: 0.42, time.total: 15.96, time.warmup: 0.22, time.bench: 15.74 HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[1]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)] round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 217, GC.time: 0.45, time.total: 13.87, time.warmup: 0.02, time.bench: 13.85 HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[2]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)] round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 55, GC.time: 0.16, time.total: 4.13, time.warmup: 0.00, time.bench: 4.12 HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[3]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)] round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 55, GC.time: 0.16, time.total: 4.30, time.warmup: 0.00, time.bench: 4.30 HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[4]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)] round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 8, GC.time: 0.03, time.total: 1.10, time.warmup: 0.00, time.bench: 1.09 HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[5]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)] round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 8, GC.time: 0.03, time.total: 0.72, time.warmup: 0.00, time.bench: 0.72 HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[6]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)] round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 1, GC.time: 0.00, time.total: 0.60, time.warmup: 0.00, time.bench: 0.60 HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[7]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)] round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 2, GC.time: 0.01, time.total: 0.26, time.warmup: 0.00, time.bench: 0.25 Updates to HyperLogLogCollector toByteBuffer() based on code review Removed changes from DirectDruidClient from this branch and put it in another branch. Changed HyperLogLogCollector to have protected getters and setters Remove unused ByteOrder from HyperLogLogCollector Copyright header on HyperLogLogSerdeBenchmarkTest Now with less ass! Reformat in TopNNumericResultsBuilder. No code change Removed unused import in HyperLogLogCollector Replace AppendableByteArrayInputStream in DirectDruidClient * Replace with SequenceInputStream fueled by an enumeration of ChannelBufferInputStream which directly wrap the response context ChannelBuffer Modify TopNQueryQueryToolChest to use Arrays instead of Lists Modify TopNQueryQueryToolChest to use Arrays instead of Lists Revert accidental changes to DirectDruidClient They should be in another merge request: https://github.com/metamx/druid/pull/893 Fixes from code review * Extracting names from AggregatorFactory classes now done with TopNQueryQueryToolChest.extractFactoryName * Renamed variable in TopNNumericResultBuilder --- .../hyperloglog/HyperLogLogCollector.java | 46 ++-- .../query/topn/TopNNumericResultBuilder.java | 49 +++- .../query/topn/TopNQueryQueryToolChest.java | 74 +++-- .../HyperLogLogSerdeBenchmarkTest.java | 258 ++++++++++++++++++ 4 files changed, 383 insertions(+), 44 deletions(-) create mode 100644 processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogSerdeBenchmarkTest.java diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java index 7ccbdac2805..7571058351a 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java @@ -28,20 +28,20 @@ import java.nio.ByteBuffer; /** * Implements the HyperLogLog cardinality estimator described in: - * + * * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf - * + * * Run this code to see a simple indication of expected errors based on different m values: - * + * * - * for (int i = 1; i < 20; ++i) { - * System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i)); - * } + * for (int i = 1; i < 20; ++i) { + * System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i)); + * } * * * This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that * only one thread is ever calling methods on it. - * + * * If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior */ public abstract class HyperLogLogCollector implements Comparable @@ -307,7 +307,7 @@ public abstract class HyperLogLogCollector implements Comparable (registerOffset + range)) { final byte currMax = getMaxOverflowValue(); if (positionOf1 > currMax) { - if(currMax <= (registerOffset + range)) { + if (currMax <= (registerOffset + range)) { // this could be optimized by having an add without sanity checks add(getMaxOverflowRegister(), currMax); } @@ -368,7 +368,7 @@ public abstract class HyperLogLogCollector implements Comparable aggFactories; private final List postAggs; private final PriorityQueue pQueue; private final Comparator dimValComparator; + private final String[] aggFactoryNames; private static final Comparator dimNameComparator = new Comparator() { @Override @@ -86,7 +88,8 @@ public class TopNNumericResultBuilder implements TopNResultBuilder this.timestamp = timestamp; this.dimSpec = dimSpec; this.metricName = metricName; - this.aggFactories = aggFactories; + this.aggFactoryNames = TopNQueryQueryToolChest.extractFactoryName(aggFactories); + this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName); this.threshold = threshold; this.metricComparator = comparator; @@ -109,6 +112,8 @@ public class TopNNumericResultBuilder implements TopNResultBuilder pQueue = new PriorityQueue<>(this.threshold + 1, this.dimValComparator); } + private static final int LOOP_UNROLL_COUNT = 8; + @Override public TopNNumericResultBuilder addEntry( String dimName, @@ -116,15 +121,45 @@ public class TopNNumericResultBuilder implements TopNResultBuilder Object[] metricVals ) { - final Map metricValues = new LinkedHashMap<>(metricVals.length + postAggs.size()); + Preconditions.checkArgument( + metricVals.length == aggFactoryNames.length, + "metricVals must be the same length as aggFactories" + ); + + final Map metricValues = Maps.newHashMapWithExpectedSize(metricVals.length + postAggs.size() + 1); metricValues.put(dimSpec.getOutputName(), dimName); - Iterator aggFactoryIter = aggFactories.iterator(); - for (Object metricVal : metricVals) { - metricValues.put(aggFactoryIter.next().getName(), metricVal); + final int extra = metricVals.length % LOOP_UNROLL_COUNT; + + switch (extra) { + case 7: + metricValues.put(aggFactoryNames[6], metricVals[6]); + case 6: + metricValues.put(aggFactoryNames[5], metricVals[5]); + case 5: + metricValues.put(aggFactoryNames[4], metricVals[4]); + case 4: + metricValues.put(aggFactoryNames[3], metricVals[3]); + case 3: + metricValues.put(aggFactoryNames[2], metricVals[2]); + case 2: + metricValues.put(aggFactoryNames[1], metricVals[1]); + case 1: + metricValues.put(aggFactoryNames[0], metricVals[0]); + } + for (int i = extra; i < metricVals.length; i += LOOP_UNROLL_COUNT) { + metricValues.put(aggFactoryNames[i + 0], metricVals[i + 0]); + metricValues.put(aggFactoryNames[i + 1], metricVals[i + 1]); + metricValues.put(aggFactoryNames[i + 2], metricVals[i + 2]); + metricValues.put(aggFactoryNames[i + 3], metricVals[i + 3]); + metricValues.put(aggFactoryNames[i + 4], metricVals[i + 4]); + metricValues.put(aggFactoryNames[i + 5], metricVals[i + 5]); + metricValues.put(aggFactoryNames[i + 6], metricVals[i + 6]); + metricValues.put(aggFactoryNames[i + 7], metricVals[i + 7]); } + // Order matters here, do not unroll for (PostAggregator postAgg : postAggs) { metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 39244d090c0..a16b7c5b575 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -36,7 +36,6 @@ import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.OrderedMergeSequence; import io.druid.granularity.QueryGranularity; import io.druid.query.BySegmentResultValue; -import io.druid.query.BySegmentResultValueClass; import io.druid.query.CacheStrategy; import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.Query; @@ -54,6 +53,7 @@ import io.druid.query.aggregation.PostAggregator; import io.druid.query.filter.DimFilter; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -80,6 +80,20 @@ public class TopNQueryQueryToolChest extends QueryToolChest aggregatorFactories){ + return Lists.transform( + aggregatorFactories, new Function() + { + @Nullable + @Override + public String apply(@Nullable AggregatorFactory input) + { + return input.getName(); + } + } + ).toArray(new String[0]); + } + private static List prunePostAggregators(TopNQuery query) { return AggregatorUtil.pruneDependentPostAgg( @@ -156,7 +170,10 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Result>() { private String dimension = query.getDimensionSpec().getOutputName(); - final List prunedAggs = prunePostAggregators(query); + private final List prunedAggs = prunePostAggregators(query); + private final AggregatorFactory[] aggregatorFactories = query.getAggregatorSpecs() + .toArray(new AggregatorFactory[0]); + private final String[] aggFactoryNames = extractFactoryName(query.getAggregatorSpecs()); @Override public Result apply(Result result) @@ -169,16 +186,24 @@ public class TopNQueryQueryToolChest extends QueryToolChest apply(DimensionAndMetricValueExtractor input) { - final Map values = Maps.newHashMap(); - for (AggregatorFactory agg : query.getAggregatorSpecs()) { - values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName()))); + final Map values = Maps.newHashMapWithExpectedSize( + aggregatorFactories.length + + prunedAggs.size() + + 1 + ); + // JVM couldn't optimize this too well, so this is helping it out a bit. + for (int i = 0; i < aggregatorFactories.length; ++i) { + final String aggName = aggFactoryNames[i]; + values.put(aggName, fn.manipulate(aggregatorFactories[i], input.getMetric(aggName))); } + for (PostAggregator postAgg : prunedAggs) { - Object calculatedPostAgg = input.getMetric(postAgg.getName()); + final String name = postAgg.getName(); + Object calculatedPostAgg = input.getMetric(name); if (calculatedPostAgg != null) { - values.put(postAgg.getName(), calculatedPostAgg); + values.put(name, calculatedPostAgg); } else { - values.put(postAgg.getName(), postAgg.compute(values)); + values.put(name, postAgg.compute(values)); } } values.put(dimension, input.getDimensionValue(dimension)); @@ -205,6 +230,10 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Result>() { private String dimension = query.getDimensionSpec().getOutputName(); + private final AggregatorFactory[] aggregatorFactories = query.getAggregatorSpecs() + .toArray(new AggregatorFactory[0]); + private final String[] aggFactoryNames = extractFactoryName(query.getAggregatorSpecs()); + private final PostAggregator[] postAggregators = query.getPostAggregatorSpecs().toArray(new PostAggregator[0]); @Override public Result apply(Result result) @@ -217,13 +246,19 @@ public class TopNQueryQueryToolChest extends QueryToolChest apply(DimensionAndMetricValueExtractor input) { - final Map values = Maps.newHashMap(); + final Map values = Maps.newHashMapWithExpectedSize( + aggregatorFactories.length + + query.getPostAggregatorSpecs().size() + + 1 + ); // put non finalized aggregators for calculating dependent post Aggregators - for (AggregatorFactory agg : query.getAggregatorSpecs()) { - values.put(agg.getName(), input.getMetric(agg.getName())); + // JVM is dumb about optimization + for( int i = 0; i < aggFactoryNames.length; ++i){ + final String name = aggFactoryNames[i]; + values.put(name, input.getMetric(name)); } - for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { + for (PostAggregator postAgg : postAggregators) { Object calculatedPostAgg = input.getMetric(postAgg.getName()); if (calculatedPostAgg != null) { values.put(postAgg.getName(), calculatedPostAgg); @@ -231,8 +266,9 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Object, TopNQuery>() { - private final List aggs = query.getAggregatorSpecs(); + private final List aggs = Lists.newArrayList(query.getAggregatorSpecs()); private final List postAggs = AggregatorUtil.pruneDependentPostAgg( query.getPostAggregatorSpecs(), query.getTopNMetricSpec() @@ -306,6 +342,8 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Object>() { + private final String[] aggFactoryNames = extractFactoryName(query.getAggregatorSpecs()); + @Override public Object apply(final Result input) { @@ -315,10 +353,10 @@ public class TopNQueryQueryToolChest extends QueryToolChest vals = Lists.newArrayListWithCapacity(aggs.size() + 2); + List vals = Lists.newArrayListWithCapacity(aggFactoryNames.length + 2); vals.add(result.getStringDimensionValue(query.getDimensionSpec().getOutputName())); - for (AggregatorFactory agg : aggs) { - vals.add(result.getMetric(agg.getName())); + for (String aggName : aggFactoryNames) { + vals.add(result.getMetric(aggName)); } retVal.add(vals); } diff --git a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogSerdeBenchmarkTest.java b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogSerdeBenchmarkTest.java new file mode 100644 index 00000000000..71a20057d63 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperLogLogSerdeBenchmarkTest.java @@ -0,0 +1,258 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package io.druid.query.aggregation.hyperloglog; + +import com.carrotsearch.junitbenchmarks.AbstractBenchmark; +import com.carrotsearch.junitbenchmarks.BenchmarkOptions; +import com.carrotsearch.junitbenchmarks.BenchmarkRule; +import com.google.common.collect.ImmutableList; +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +/** + * + */ + +@RunWith(Parameterized.class) +@Ignore // Don't need to run every time +public class HyperLogLogSerdeBenchmarkTest extends AbstractBenchmark +{ + private final HyperLogLogCollector collector; + private final long NUM_HASHES; + public HyperLogLogSerdeBenchmarkTest(final HyperLogLogCollector collector, Long num_hashes) + { + this.collector = collector; + this.NUM_HASHES = num_hashes; + } + + private static final HashFunction hashFunction = Hashing.murmur3_128(); + + @Parameterized.Parameters + public static Collection getParameters() + { + return ImmutableList.of( + (Object[]) Arrays.asList(new priorByteBufferSerializer(), new Long(1 << 10)).toArray(), + (Object[]) Arrays.asList(new newByteBufferSerializer(), new Long(1 << 10)).toArray(), + (Object[]) Arrays.asList(new newByteBufferSerializerWithPuts(), new Long(1 << 10)).toArray(), + (Object[]) Arrays.asList(new priorByteBufferSerializer(), new Long(1 << 8)).toArray(), + (Object[]) Arrays.asList(new newByteBufferSerializer(), new Long(1 << 8)).toArray(), + (Object[]) Arrays.asList(new newByteBufferSerializerWithPuts(), new Long(1 << 8)).toArray(), + (Object[]) Arrays.asList(new priorByteBufferSerializer(), new Long(1 << 5)).toArray(), + (Object[]) Arrays.asList(new newByteBufferSerializer(), new Long(1 << 5)).toArray(), + (Object[]) Arrays.asList(new newByteBufferSerializerWithPuts(), new Long(1 << 5)).toArray(), + (Object[]) Arrays.asList(new priorByteBufferSerializer(), new Long(1 << 2)).toArray(), + (Object[]) Arrays.asList(new newByteBufferSerializer(), new Long(1 << 2)).toArray(), + (Object[]) Arrays.asList(new newByteBufferSerializerWithPuts(), new Long(1 << 2)).toArray() + ); + } + + private static final class priorByteBufferSerializer extends HLLCV1 + { + @Override + public ByteBuffer toByteBuffer() + { + final ByteBuffer myBuffer = getStorageBuffer(); + final int initialPosition = getInitPosition(); + short numNonZeroRegisters = getNumNonZeroRegisters(); + + // store sparsely + if (myBuffer.remaining() == getNumBytesForDenseStorage() && numNonZeroRegisters < DENSE_THRESHOLD) { + ByteBuffer retVal = ByteBuffer.wrap(new byte[numNonZeroRegisters * 3 + getNumHeaderBytes()]); + setVersion(retVal); + setRegisterOffset(retVal, getRegisterOffset()); + setNumNonZeroRegisters(retVal, numNonZeroRegisters); + setMaxOverflowValue(retVal, getMaxOverflowValue()); + setMaxOverflowRegister(retVal, getMaxOverflowRegister()); + + int startPosition = getPayloadBytePosition(); + retVal.position(getPayloadBytePosition(retVal)); + for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) { + if (myBuffer.get(i) != 0) { + retVal.putShort((short) (0xffff & (i - initialPosition))); + retVal.put(myBuffer.get(i)); + } + } + retVal.rewind(); + return retVal.asReadOnlyBuffer(); + } + + return myBuffer.asReadOnlyBuffer(); + } + } + + private static final class newByteBufferSerializer extends HLLCV1 + { + @Override + public ByteBuffer toByteBuffer() + { + + final ByteBuffer myBuffer = getStorageBuffer(); + final int initialPosition = getInitPosition(); + final short numNonZeroRegisters = getNumNonZeroRegisters(); + + // store sparsely + if (myBuffer.remaining() == getNumBytesForDenseStorage() && numNonZeroRegisters < DENSE_THRESHOLD) { + final ByteBuffer retVal = ByteBuffer.wrap(new byte[numNonZeroRegisters * 3 + getNumHeaderBytes()]); + setVersion(retVal); + setRegisterOffset(retVal, getRegisterOffset()); + setNumNonZeroRegisters(retVal, numNonZeroRegisters); + setMaxOverflowValue(retVal, getMaxOverflowValue()); + setMaxOverflowRegister(retVal, getMaxOverflowRegister()); + + final int startPosition = getPayloadBytePosition(); + retVal.position(getPayloadBytePosition(retVal)); + + final byte[] zipperBuffer = new byte[NUM_BYTES_FOR_BUCKETS]; + ByteBuffer roStorageBuffer = myBuffer.asReadOnlyBuffer(); + roStorageBuffer.position(startPosition); + roStorageBuffer.get(zipperBuffer); + + final ByteOrder byteOrder = retVal.order(); + + final byte[] tempBuffer = new byte[numNonZeroRegisters * 3]; + int outBufferPos = 0; + for (int i = 0; i < NUM_BYTES_FOR_BUCKETS; ++i) { + if (zipperBuffer[i] != 0) { + final short val = (short) (0xffff & (i + startPosition - initialPosition)); + if(byteOrder.equals(ByteOrder.LITTLE_ENDIAN)){ + tempBuffer[outBufferPos + 0] = (byte) (0xff & val); + tempBuffer[outBufferPos + 1] = (byte) (0xff & (val>>8)); + }else{ + tempBuffer[outBufferPos + 1] = (byte) (0xff & val); + tempBuffer[outBufferPos + 0] = (byte) (0xff & (val>>8)); + } + tempBuffer[outBufferPos + 2] = zipperBuffer[i]; + outBufferPos += 3; + } + } + retVal.put(tempBuffer); + retVal.rewind(); + return retVal.asReadOnlyBuffer(); + } + + return myBuffer.asReadOnlyBuffer(); + } + } + + + private static final class newByteBufferSerializerWithPuts extends HLLCV1 + { + @Override + public ByteBuffer toByteBuffer() + { + final ByteBuffer myBuffer = getStorageBuffer(); + final int initialPosition = getInitPosition(); + + final short numNonZeroRegisters = getNumNonZeroRegisters(); + + // store sparsely + if (myBuffer.remaining() == getNumBytesForDenseStorage() && numNonZeroRegisters < DENSE_THRESHOLD) { + final ByteBuffer retVal = ByteBuffer.wrap(new byte[numNonZeroRegisters * 3 + getNumHeaderBytes()]); + setVersion(retVal); + setRegisterOffset(retVal, getRegisterOffset()); + setNumNonZeroRegisters(retVal, numNonZeroRegisters); + setMaxOverflowValue(retVal, getMaxOverflowValue()); + setMaxOverflowRegister(retVal, getMaxOverflowRegister()); + + final int startPosition = getPayloadBytePosition(); + retVal.position(getPayloadBytePosition(retVal)); + + final byte[] zipperBuffer = new byte[NUM_BYTES_FOR_BUCKETS]; + ByteBuffer roStorageBuffer = myBuffer.asReadOnlyBuffer(); + roStorageBuffer.position(startPosition); + roStorageBuffer.get(zipperBuffer); + + final ByteOrder byteOrder = retVal.order(); + + for (int i = 0; i < NUM_BYTES_FOR_BUCKETS; ++i) { + if (zipperBuffer[i] != 0) { + final short val = (short) (0xffff & (i + startPosition - initialPosition)); + retVal.putShort(val); + retVal.put(zipperBuffer[i]); + } + } + retVal.rewind(); + return retVal.asReadOnlyBuffer(); + } + + return myBuffer.asReadOnlyBuffer(); + } + } + + + //-------------------------------------------------------------------------------------------------------------------- + + + + private void fillCollector(HyperLogLogCollector collector) + { + Random rand = new Random(758190); + for (long i = 0; i < NUM_HASHES; ++i) { + collector.add(hashFunction.hashLong(rand.nextLong()).asBytes()); + } + } + + private static HashCode getHash(final ByteBuffer byteBuffer) + { + Hasher hasher = hashFunction.newHasher(); + while (byteBuffer.position() < byteBuffer.limit()) { + hasher.putByte(byteBuffer.get()); + } + return hasher.hash(); + } + + @BeforeClass + public static void setupHash() + { + + } + + @Before + public void setup() + { + fillCollector(collector); + } + + + volatile HashCode hashCode; + + @BenchmarkOptions(benchmarkRounds = 100000, warmupRounds = 100) + @Test + public void benchmarkToByteBuffer() + { + hashCode = getHash(collector.toByteBuffer()); + } +}