Multiple speed improvements revolving around topN with HLL

Change serializer / deserializer for HyperLogLog
* Changed DirectDruidClient's InputStream handling. Is now ~10% faster for data heavy queries, and has lower variance in execution speed.
* Changed HLL Collector's toByteStream() method to be better optimized for small values. Is notably faster for small result quantities which fall into the sparse HLL bucket codepath.
  * No change for dense HLL which just uses a direct bytestream of the underlying byte data.

TopNNumericResultBuilder semi-aggressive loop unrolling for metricVals

Benchmark for HLL for sparse packing (small HLL bucket population):
HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[0]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)]
 round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 216, GC.time: 0.42, time.total: 15.96, time.warmup: 0.22, time.bench: 15.74
HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[1]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)]
 round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 217, GC.time: 0.45, time.total: 13.87, time.warmup: 0.02, time.bench: 13.85
HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[2]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)]
 round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 55, GC.time: 0.16, time.total: 4.13, time.warmup: 0.00, time.bench: 4.12
HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[3]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)]
 round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 55, GC.time: 0.16, time.total: 4.30, time.warmup: 0.00, time.bench: 4.30
HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[4]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)]
 round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 8, GC.time: 0.03, time.total: 1.10, time.warmup: 0.00, time.bench: 1.09
HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[5]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)]
 round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 8, GC.time: 0.03, time.total: 0.72, time.warmup: 0.00, time.bench: 0.72
HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[6]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)]
 round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 1, GC.time: 0.00, time.total: 0.60, time.warmup: 0.00, time.bench: 0.60
HyperLogLogSerdeBenchmarkTest.benchmarkToByteBuffer[7]: [measured 100000 out of 100100 rounds, threads: 1 (sequential)]
 round: 0.00 [+- 0.00], round.block: 0.00 [+- 0.00], round.gc: 0.00 [+- 0.00], GC.calls: 2, GC.time: 0.01, time.total: 0.26, time.warmup: 0.00, time.bench: 0.25

Updates to HyperLogLogCollector toByteBuffer() based on code review

Removed changes from DirectDruidClient from this branch and put it in another branch.

Changed HyperLogLogCollector to have protected getters and setters

Remove unused ByteOrder from HyperLogLogCollector

Copyright header on HyperLogLogSerdeBenchmarkTest

Now with less ass!

Reformat in TopNNumericResultsBuilder. No code change

Removed unused import in HyperLogLogCollector

Replace AppendableByteArrayInputStream in DirectDruidClient
* Replace with SequenceInputStream fueled by an enumeration of ChannelBufferInputStream which directly wrap the response context ChannelBuffer

Modify TopNQueryQueryToolChest to use Arrays instead of Lists

Modify TopNQueryQueryToolChest to use Arrays instead of Lists

Revert accidental changes to DirectDruidClient

They should be in another merge request:
https://github.com/metamx/druid/pull/893

Fixes from code review
* Extracting names from AggregatorFactory classes now done with TopNQueryQueryToolChest.extractFactoryName
* Renamed variable in TopNNumericResultBuilder
This commit is contained in:
Charles Allen 2014-11-18 22:07:18 -08:00
parent fe0a56cad6
commit 70e3108282
4 changed files with 383 additions and 44 deletions

View File

@ -28,20 +28,20 @@ import java.nio.ByteBuffer;
/** /**
* Implements the HyperLogLog cardinality estimator described in: * Implements the HyperLogLog cardinality estimator described in:
* *
* http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
* *
* Run this code to see a simple indication of expected errors based on different m values: * Run this code to see a simple indication of expected errors based on different m values:
* *
* <code> * <code>
* for (int i = 1; i &lt; 20; ++i) { * for (int i = 1; i &lt; 20; ++i) {
* System.out.printf("i[%,d], val[%,d] =&gt; error[%f%%]%n", i, 2 &lt;&lt; i, 104 / Math.sqrt(2 &lt;&lt; i)); * System.out.printf("i[%,d], val[%,d] =&gt; error[%f%%]%n", i, 2 &lt;&lt; i, 104 / Math.sqrt(2 &lt;&lt; i));
* } * }
* </code> * </code>
* *
* This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that * This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that
* only one thread is ever calling methods on it. * only one thread is ever calling methods on it.
* *
* If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior * If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior
*/ */
public abstract class HyperLogLogCollector implements Comparable<HyperLogLogCollector> public abstract class HyperLogLogCollector implements Comparable<HyperLogLogCollector>
@ -307,7 +307,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
} else if (positionOf1 > (registerOffset + range)) { } else if (positionOf1 > (registerOffset + range)) {
final byte currMax = getMaxOverflowValue(); final byte currMax = getMaxOverflowValue();
if (positionOf1 > currMax) { if (positionOf1 > currMax) {
if(currMax <= (registerOffset + range)) { if (currMax <= (registerOffset + range)) {
// this could be optimized by having an add without sanity checks // this could be optimized by having an add without sanity checks
add(getMaxOverflowRegister(), currMax); add(getMaxOverflowRegister(), currMax);
} }
@ -368,7 +368,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
otherBuffer.position(other.getPayloadBytePosition()); otherBuffer.position(other.getPayloadBytePosition());
if (isSparse(otherBuffer)) { if (isSparse(otherBuffer)) {
while(otherBuffer.hasRemaining()) { while (otherBuffer.hasRemaining()) {
final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes(); final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes();
numNonZero += mergeAndStoreByteRegister( numNonZero += mergeAndStoreByteRegister(
storageBuffer, storageBuffer,
@ -416,23 +416,31 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
public ByteBuffer toByteBuffer() public ByteBuffer toByteBuffer()
{ {
short numNonZeroRegisters = getNumNonZeroRegisters();
final short numNonZeroRegisters = getNumNonZeroRegisters();
// store sparsely // store sparsely
if (storageBuffer.remaining() == getNumBytesForDenseStorage() && numNonZeroRegisters < DENSE_THRESHOLD) { if (storageBuffer.remaining() == getNumBytesForDenseStorage() && numNonZeroRegisters < DENSE_THRESHOLD) {
ByteBuffer retVal = ByteBuffer.wrap(new byte[numNonZeroRegisters * 3 + getNumHeaderBytes()]); final ByteBuffer retVal = ByteBuffer.wrap(new byte[numNonZeroRegisters * 3 + getNumHeaderBytes()]);
setVersion(retVal); setVersion(retVal);
setRegisterOffset(retVal, getRegisterOffset()); setRegisterOffset(retVal, getRegisterOffset());
setNumNonZeroRegisters(retVal, numNonZeroRegisters); setNumNonZeroRegisters(retVal, numNonZeroRegisters);
setMaxOverflowValue(retVal, getMaxOverflowValue()); setMaxOverflowValue(retVal, getMaxOverflowValue());
setMaxOverflowRegister(retVal, getMaxOverflowRegister()); setMaxOverflowRegister(retVal, getMaxOverflowRegister());
int startPosition = getPayloadBytePosition(); final int startPosition = getPayloadBytePosition();
retVal.position(getPayloadBytePosition(retVal)); retVal.position(getPayloadBytePosition(retVal));
for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) {
if (storageBuffer.get(i) != 0) { final byte[] zipperBuffer = new byte[NUM_BYTES_FOR_BUCKETS];
retVal.putShort((short) (0xffff & (i - initPosition))); ByteBuffer roStorageBuffer = storageBuffer.asReadOnlyBuffer();
retVal.put(storageBuffer.get(i)); roStorageBuffer.position(startPosition);
roStorageBuffer.get(zipperBuffer);
for (int i = 0; i < NUM_BYTES_FOR_BUCKETS; ++i) {
if (zipperBuffer[i] != 0) {
final short val = (short) (0xffff & (i + startPosition - initPosition));
retVal.putShort(val);
retVal.put(zipperBuffer[i]);
} }
} }
retVal.rewind(); retVal.rewind();
@ -507,12 +515,12 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
return false; return false;
} }
if(storageBuffer == null && otherBuffer == null) { if (storageBuffer == null && otherBuffer == null) {
return true; return true;
} }
final ByteBuffer denseStorageBuffer; final ByteBuffer denseStorageBuffer;
if(storageBuffer.remaining() != getNumBytesForDenseStorage()) { if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer); HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer);
denseCollector.convertToDenseStorage(); denseCollector.convertToDenseStorage();
denseStorageBuffer = denseCollector.storageBuffer; denseStorageBuffer = denseCollector.storageBuffer;
@ -520,7 +528,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
denseStorageBuffer = storageBuffer; denseStorageBuffer = storageBuffer;
} }
if(otherBuffer.remaining() != getNumBytesForDenseStorage()) { if (otherBuffer.remaining() != getNumBytesForDenseStorage()) {
HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer); HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer);
otherCollector.convertToDenseStorage(); otherCollector.convertToDenseStorage();
otherBuffer = otherCollector.storageBuffer; otherBuffer = otherCollector.storageBuffer;

View File

@ -20,7 +20,9 @@
package io.druid.query.topn; package io.druid.query.topn;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.AggregatorUtil;
@ -28,10 +30,10 @@ import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.PriorityQueue; import java.util.PriorityQueue;
@ -46,10 +48,10 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
private final DateTime timestamp; private final DateTime timestamp;
private final DimensionSpec dimSpec; private final DimensionSpec dimSpec;
private final String metricName; private final String metricName;
private final List<AggregatorFactory> aggFactories;
private final List<PostAggregator> postAggs; private final List<PostAggregator> postAggs;
private final PriorityQueue<DimValHolder> pQueue; private final PriorityQueue<DimValHolder> pQueue;
private final Comparator<DimValHolder> dimValComparator; private final Comparator<DimValHolder> dimValComparator;
private final String[] aggFactoryNames;
private static final Comparator<String> dimNameComparator = new Comparator<String>() private static final Comparator<String> dimNameComparator = new Comparator<String>()
{ {
@Override @Override
@ -86,7 +88,8 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
this.timestamp = timestamp; this.timestamp = timestamp;
this.dimSpec = dimSpec; this.dimSpec = dimSpec;
this.metricName = metricName; this.metricName = metricName;
this.aggFactories = aggFactories; this.aggFactoryNames = TopNQueryQueryToolChest.extractFactoryName(aggFactories);
this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName); this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName);
this.threshold = threshold; this.threshold = threshold;
this.metricComparator = comparator; this.metricComparator = comparator;
@ -109,6 +112,8 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
pQueue = new PriorityQueue<>(this.threshold + 1, this.dimValComparator); pQueue = new PriorityQueue<>(this.threshold + 1, this.dimValComparator);
} }
private static final int LOOP_UNROLL_COUNT = 8;
@Override @Override
public TopNNumericResultBuilder addEntry( public TopNNumericResultBuilder addEntry(
String dimName, String dimName,
@ -116,15 +121,45 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
Object[] metricVals Object[] metricVals
) )
{ {
final Map<String, Object> metricValues = new LinkedHashMap<>(metricVals.length + postAggs.size()); Preconditions.checkArgument(
metricVals.length == aggFactoryNames.length,
"metricVals must be the same length as aggFactories"
);
final Map<String, Object> metricValues = Maps.newHashMapWithExpectedSize(metricVals.length + postAggs.size() + 1);
metricValues.put(dimSpec.getOutputName(), dimName); metricValues.put(dimSpec.getOutputName(), dimName);
Iterator<AggregatorFactory> aggFactoryIter = aggFactories.iterator(); final int extra = metricVals.length % LOOP_UNROLL_COUNT;
for (Object metricVal : metricVals) {
metricValues.put(aggFactoryIter.next().getName(), metricVal); switch (extra) {
case 7:
metricValues.put(aggFactoryNames[6], metricVals[6]);
case 6:
metricValues.put(aggFactoryNames[5], metricVals[5]);
case 5:
metricValues.put(aggFactoryNames[4], metricVals[4]);
case 4:
metricValues.put(aggFactoryNames[3], metricVals[3]);
case 3:
metricValues.put(aggFactoryNames[2], metricVals[2]);
case 2:
metricValues.put(aggFactoryNames[1], metricVals[1]);
case 1:
metricValues.put(aggFactoryNames[0], metricVals[0]);
}
for (int i = extra; i < metricVals.length; i += LOOP_UNROLL_COUNT) {
metricValues.put(aggFactoryNames[i + 0], metricVals[i + 0]);
metricValues.put(aggFactoryNames[i + 1], metricVals[i + 1]);
metricValues.put(aggFactoryNames[i + 2], metricVals[i + 2]);
metricValues.put(aggFactoryNames[i + 3], metricVals[i + 3]);
metricValues.put(aggFactoryNames[i + 4], metricVals[i + 4]);
metricValues.put(aggFactoryNames[i + 5], metricVals[i + 5]);
metricValues.put(aggFactoryNames[i + 6], metricVals[i + 6]);
metricValues.put(aggFactoryNames[i + 7], metricVals[i + 7]);
} }
// Order matters here, do not unroll
for (PostAggregator postAgg : postAggs) { for (PostAggregator postAgg : postAggs) {
metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); metricValues.put(postAgg.getName(), postAgg.compute(metricValues));
} }

View File

@ -36,7 +36,6 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence; import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.BySegmentResultValue; import io.druid.query.BySegmentResultValue;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
@ -54,6 +53,7 @@ import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.DimFilter; import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -80,6 +80,20 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
this.config = config; this.config = config;
} }
protected static String[] extractFactoryName(final List<AggregatorFactory> aggregatorFactories){
return Lists.transform(
aggregatorFactories, new Function<AggregatorFactory, String>()
{
@Nullable
@Override
public String apply(@Nullable AggregatorFactory input)
{
return input.getName();
}
}
).toArray(new String[0]);
}
private static List<PostAggregator> prunePostAggregators(TopNQuery query) private static List<PostAggregator> prunePostAggregators(TopNQuery query)
{ {
return AggregatorUtil.pruneDependentPostAgg( return AggregatorUtil.pruneDependentPostAgg(
@ -156,7 +170,10 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return new Function<Result<TopNResultValue>, Result<TopNResultValue>>() return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{ {
private String dimension = query.getDimensionSpec().getOutputName(); private String dimension = query.getDimensionSpec().getOutputName();
final List<PostAggregator> prunedAggs = prunePostAggregators(query); private final List<PostAggregator> prunedAggs = prunePostAggregators(query);
private final AggregatorFactory[] aggregatorFactories = query.getAggregatorSpecs()
.toArray(new AggregatorFactory[0]);
private final String[] aggFactoryNames = extractFactoryName(query.getAggregatorSpecs());
@Override @Override
public Result<TopNResultValue> apply(Result<TopNResultValue> result) public Result<TopNResultValue> apply(Result<TopNResultValue> result)
@ -169,16 +186,24 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override @Override
public Map<String, Object> apply(DimensionAndMetricValueExtractor input) public Map<String, Object> apply(DimensionAndMetricValueExtractor input)
{ {
final Map<String, Object> values = Maps.newHashMap(); final Map<String, Object> values = Maps.newHashMapWithExpectedSize(
for (AggregatorFactory agg : query.getAggregatorSpecs()) { aggregatorFactories.length
values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName()))); + prunedAggs.size()
+ 1
);
// JVM couldn't optimize this too well, so this is helping it out a bit.
for (int i = 0; i < aggregatorFactories.length; ++i) {
final String aggName = aggFactoryNames[i];
values.put(aggName, fn.manipulate(aggregatorFactories[i], input.getMetric(aggName)));
} }
for (PostAggregator postAgg : prunedAggs) { for (PostAggregator postAgg : prunedAggs) {
Object calculatedPostAgg = input.getMetric(postAgg.getName()); final String name = postAgg.getName();
Object calculatedPostAgg = input.getMetric(name);
if (calculatedPostAgg != null) { if (calculatedPostAgg != null) {
values.put(postAgg.getName(), calculatedPostAgg); values.put(name, calculatedPostAgg);
} else { } else {
values.put(postAgg.getName(), postAgg.compute(values)); values.put(name, postAgg.compute(values));
} }
} }
values.put(dimension, input.getDimensionValue(dimension)); values.put(dimension, input.getDimensionValue(dimension));
@ -205,6 +230,10 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return new Function<Result<TopNResultValue>, Result<TopNResultValue>>() return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{ {
private String dimension = query.getDimensionSpec().getOutputName(); private String dimension = query.getDimensionSpec().getOutputName();
private final AggregatorFactory[] aggregatorFactories = query.getAggregatorSpecs()
.toArray(new AggregatorFactory[0]);
private final String[] aggFactoryNames = extractFactoryName(query.getAggregatorSpecs());
private final PostAggregator[] postAggregators = query.getPostAggregatorSpecs().toArray(new PostAggregator[0]);
@Override @Override
public Result<TopNResultValue> apply(Result<TopNResultValue> result) public Result<TopNResultValue> apply(Result<TopNResultValue> result)
@ -217,13 +246,19 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override @Override
public Map<String, Object> apply(DimensionAndMetricValueExtractor input) public Map<String, Object> apply(DimensionAndMetricValueExtractor input)
{ {
final Map<String, Object> values = Maps.newHashMap(); final Map<String, Object> values = Maps.newHashMapWithExpectedSize(
aggregatorFactories.length
+ query.getPostAggregatorSpecs().size()
+ 1
);
// put non finalized aggregators for calculating dependent post Aggregators // put non finalized aggregators for calculating dependent post Aggregators
for (AggregatorFactory agg : query.getAggregatorSpecs()) { // JVM is dumb about optimization
values.put(agg.getName(), input.getMetric(agg.getName())); for( int i = 0; i < aggFactoryNames.length; ++i){
final String name = aggFactoryNames[i];
values.put(name, input.getMetric(name));
} }
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { for (PostAggregator postAgg : postAggregators) {
Object calculatedPostAgg = input.getMetric(postAgg.getName()); Object calculatedPostAgg = input.getMetric(postAgg.getName());
if (calculatedPostAgg != null) { if (calculatedPostAgg != null) {
values.put(postAgg.getName(), calculatedPostAgg); values.put(postAgg.getName(), calculatedPostAgg);
@ -231,8 +266,9 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
values.put(postAgg.getName(), postAgg.compute(values)); values.put(postAgg.getName(), postAgg.compute(values));
} }
} }
for (AggregatorFactory agg : query.getAggregatorSpecs()) { for( int i = 0; i < aggFactoryNames.length; ++i){
values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName()))); final String name = aggFactoryNames[i];
values.put(name, fn.manipulate(aggregatorFactories[i], input.getMetric(name)));
} }
values.put(dimension, input.getDimensionValue(dimension)); values.put(dimension, input.getDimensionValue(dimension));
@ -262,7 +298,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
{ {
return new CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>() return new CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>()
{ {
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs(); private final List<AggregatorFactory> aggs = Lists.newArrayList(query.getAggregatorSpecs());
private final List<PostAggregator> postAggs = AggregatorUtil.pruneDependentPostAgg( private final List<PostAggregator> postAggs = AggregatorUtil.pruneDependentPostAgg(
query.getPostAggregatorSpecs(), query.getPostAggregatorSpecs(),
query.getTopNMetricSpec() query.getTopNMetricSpec()
@ -306,6 +342,8 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
{ {
return new Function<Result<TopNResultValue>, Object>() return new Function<Result<TopNResultValue>, Object>()
{ {
private final String[] aggFactoryNames = extractFactoryName(query.getAggregatorSpecs());
@Override @Override
public Object apply(final Result<TopNResultValue> input) public Object apply(final Result<TopNResultValue> input)
{ {
@ -315,10 +353,10 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
// make sure to preserve timezone information when caching results // make sure to preserve timezone information when caching results
retVal.add(input.getTimestamp().getMillis()); retVal.add(input.getTimestamp().getMillis());
for (DimensionAndMetricValueExtractor result : results) { for (DimensionAndMetricValueExtractor result : results) {
List<Object> vals = Lists.newArrayListWithCapacity(aggs.size() + 2); List<Object> vals = Lists.newArrayListWithCapacity(aggFactoryNames.length + 2);
vals.add(result.getStringDimensionValue(query.getDimensionSpec().getOutputName())); vals.add(result.getStringDimensionValue(query.getDimensionSpec().getOutputName()));
for (AggregatorFactory agg : aggs) { for (String aggName : aggFactoryNames) {
vals.add(result.getMetric(agg.getName())); vals.add(result.getMetric(aggName));
} }
retVal.add(vals); retVal.add(vals);
} }

View File

@ -0,0 +1,258 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.carrotsearch.junitbenchmarks.AbstractBenchmark;
import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
import com.carrotsearch.junitbenchmarks.BenchmarkRule;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
/**
*
*/
@RunWith(Parameterized.class)
@Ignore // Don't need to run every time
public class HyperLogLogSerdeBenchmarkTest extends AbstractBenchmark
{
private final HyperLogLogCollector collector;
private final long NUM_HASHES;
public HyperLogLogSerdeBenchmarkTest(final HyperLogLogCollector collector, Long num_hashes)
{
this.collector = collector;
this.NUM_HASHES = num_hashes;
}
private static final HashFunction hashFunction = Hashing.murmur3_128();
@Parameterized.Parameters
public static Collection<Object[]> getParameters()
{
return ImmutableList.<Object[]>of(
(Object[]) Arrays.asList(new priorByteBufferSerializer(), new Long(1 << 10)).toArray(),
(Object[]) Arrays.asList(new newByteBufferSerializer(), new Long(1 << 10)).toArray(),
(Object[]) Arrays.asList(new newByteBufferSerializerWithPuts(), new Long(1 << 10)).toArray(),
(Object[]) Arrays.asList(new priorByteBufferSerializer(), new Long(1 << 8)).toArray(),
(Object[]) Arrays.asList(new newByteBufferSerializer(), new Long(1 << 8)).toArray(),
(Object[]) Arrays.asList(new newByteBufferSerializerWithPuts(), new Long(1 << 8)).toArray(),
(Object[]) Arrays.asList(new priorByteBufferSerializer(), new Long(1 << 5)).toArray(),
(Object[]) Arrays.asList(new newByteBufferSerializer(), new Long(1 << 5)).toArray(),
(Object[]) Arrays.asList(new newByteBufferSerializerWithPuts(), new Long(1 << 5)).toArray(),
(Object[]) Arrays.asList(new priorByteBufferSerializer(), new Long(1 << 2)).toArray(),
(Object[]) Arrays.asList(new newByteBufferSerializer(), new Long(1 << 2)).toArray(),
(Object[]) Arrays.asList(new newByteBufferSerializerWithPuts(), new Long(1 << 2)).toArray()
);
}
private static final class priorByteBufferSerializer extends HLLCV1
{
@Override
public ByteBuffer toByteBuffer()
{
final ByteBuffer myBuffer = getStorageBuffer();
final int initialPosition = getInitPosition();
short numNonZeroRegisters = getNumNonZeroRegisters();
// store sparsely
if (myBuffer.remaining() == getNumBytesForDenseStorage() && numNonZeroRegisters < DENSE_THRESHOLD) {
ByteBuffer retVal = ByteBuffer.wrap(new byte[numNonZeroRegisters * 3 + getNumHeaderBytes()]);
setVersion(retVal);
setRegisterOffset(retVal, getRegisterOffset());
setNumNonZeroRegisters(retVal, numNonZeroRegisters);
setMaxOverflowValue(retVal, getMaxOverflowValue());
setMaxOverflowRegister(retVal, getMaxOverflowRegister());
int startPosition = getPayloadBytePosition();
retVal.position(getPayloadBytePosition(retVal));
for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) {
if (myBuffer.get(i) != 0) {
retVal.putShort((short) (0xffff & (i - initialPosition)));
retVal.put(myBuffer.get(i));
}
}
retVal.rewind();
return retVal.asReadOnlyBuffer();
}
return myBuffer.asReadOnlyBuffer();
}
}
private static final class newByteBufferSerializer extends HLLCV1
{
@Override
public ByteBuffer toByteBuffer()
{
final ByteBuffer myBuffer = getStorageBuffer();
final int initialPosition = getInitPosition();
final short numNonZeroRegisters = getNumNonZeroRegisters();
// store sparsely
if (myBuffer.remaining() == getNumBytesForDenseStorage() && numNonZeroRegisters < DENSE_THRESHOLD) {
final ByteBuffer retVal = ByteBuffer.wrap(new byte[numNonZeroRegisters * 3 + getNumHeaderBytes()]);
setVersion(retVal);
setRegisterOffset(retVal, getRegisterOffset());
setNumNonZeroRegisters(retVal, numNonZeroRegisters);
setMaxOverflowValue(retVal, getMaxOverflowValue());
setMaxOverflowRegister(retVal, getMaxOverflowRegister());
final int startPosition = getPayloadBytePosition();
retVal.position(getPayloadBytePosition(retVal));
final byte[] zipperBuffer = new byte[NUM_BYTES_FOR_BUCKETS];
ByteBuffer roStorageBuffer = myBuffer.asReadOnlyBuffer();
roStorageBuffer.position(startPosition);
roStorageBuffer.get(zipperBuffer);
final ByteOrder byteOrder = retVal.order();
final byte[] tempBuffer = new byte[numNonZeroRegisters * 3];
int outBufferPos = 0;
for (int i = 0; i < NUM_BYTES_FOR_BUCKETS; ++i) {
if (zipperBuffer[i] != 0) {
final short val = (short) (0xffff & (i + startPosition - initialPosition));
if(byteOrder.equals(ByteOrder.LITTLE_ENDIAN)){
tempBuffer[outBufferPos + 0] = (byte) (0xff & val);
tempBuffer[outBufferPos + 1] = (byte) (0xff & (val>>8));
}else{
tempBuffer[outBufferPos + 1] = (byte) (0xff & val);
tempBuffer[outBufferPos + 0] = (byte) (0xff & (val>>8));
}
tempBuffer[outBufferPos + 2] = zipperBuffer[i];
outBufferPos += 3;
}
}
retVal.put(tempBuffer);
retVal.rewind();
return retVal.asReadOnlyBuffer();
}
return myBuffer.asReadOnlyBuffer();
}
}
private static final class newByteBufferSerializerWithPuts extends HLLCV1
{
@Override
public ByteBuffer toByteBuffer()
{
final ByteBuffer myBuffer = getStorageBuffer();
final int initialPosition = getInitPosition();
final short numNonZeroRegisters = getNumNonZeroRegisters();
// store sparsely
if (myBuffer.remaining() == getNumBytesForDenseStorage() && numNonZeroRegisters < DENSE_THRESHOLD) {
final ByteBuffer retVal = ByteBuffer.wrap(new byte[numNonZeroRegisters * 3 + getNumHeaderBytes()]);
setVersion(retVal);
setRegisterOffset(retVal, getRegisterOffset());
setNumNonZeroRegisters(retVal, numNonZeroRegisters);
setMaxOverflowValue(retVal, getMaxOverflowValue());
setMaxOverflowRegister(retVal, getMaxOverflowRegister());
final int startPosition = getPayloadBytePosition();
retVal.position(getPayloadBytePosition(retVal));
final byte[] zipperBuffer = new byte[NUM_BYTES_FOR_BUCKETS];
ByteBuffer roStorageBuffer = myBuffer.asReadOnlyBuffer();
roStorageBuffer.position(startPosition);
roStorageBuffer.get(zipperBuffer);
final ByteOrder byteOrder = retVal.order();
for (int i = 0; i < NUM_BYTES_FOR_BUCKETS; ++i) {
if (zipperBuffer[i] != 0) {
final short val = (short) (0xffff & (i + startPosition - initialPosition));
retVal.putShort(val);
retVal.put(zipperBuffer[i]);
}
}
retVal.rewind();
return retVal.asReadOnlyBuffer();
}
return myBuffer.asReadOnlyBuffer();
}
}
//--------------------------------------------------------------------------------------------------------------------
private void fillCollector(HyperLogLogCollector collector)
{
Random rand = new Random(758190);
for (long i = 0; i < NUM_HASHES; ++i) {
collector.add(hashFunction.hashLong(rand.nextLong()).asBytes());
}
}
private static HashCode getHash(final ByteBuffer byteBuffer)
{
Hasher hasher = hashFunction.newHasher();
while (byteBuffer.position() < byteBuffer.limit()) {
hasher.putByte(byteBuffer.get());
}
return hasher.hash();
}
@BeforeClass
public static void setupHash()
{
}
@Before
public void setup()
{
fillCollector(collector);
}
volatile HashCode hashCode;
@BenchmarkOptions(benchmarkRounds = 100000, warmupRounds = 100)
@Test
public void benchmarkToByteBuffer()
{
hashCode = getHash(collector.toByteBuffer());
}
}