HllSketch Merge/Build BufferAggregators: Speed up init with prebuilt sketch. (#8194)

* HllSketchMergeBufferAggregator: Speed up init by copying prebuilt sketch.

* Remove useless writableRegion call.

* POM variables.

* Fix missing reposition.

* Apply similar optimization to HllSketchBuildBufferAggregator.

* Rename emptySketch -> emptyUnion in merge flavor.

* Adjustments based on review.

* Comment update.

* Additional updates.

* Comment push.
This commit is contained in:
Gian Merlino 2019-07-31 08:18:42 -07:00 committed by GitHub
parent 15dd3d7256
commit 63461311f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 194 additions and 10 deletions

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark;
import com.yahoo.sketches.hll.HllSketch;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggregatorFactory;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@Fork(1)
@State(Scope.Benchmark)
public class DataSketchesHllBenchmark
{
private final AggregatorFactory aggregatorFactory = new HllSketchMergeAggregatorFactory(
"hll",
"hll",
null,
null,
false
);
private final ByteBuffer buf = ByteBuffer.allocateDirect(aggregatorFactory.getMaxIntermediateSize());
private BufferAggregator aggregator;
@Setup(Level.Trial)
public void setUp()
{
aggregator = aggregatorFactory.factorizeBuffered(
new ColumnSelectorFactory()
{
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
return null;
}
@Override
public ColumnValueSelector makeColumnValueSelector(String columnName)
{
return null;
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return null;
}
}
);
}
@TearDown(Level.Trial)
public void tearDown()
{
aggregator.close();
aggregator = null;
}
@Benchmark
public void init(Blackhole bh)
{
aggregator.init(buf, 0);
}
@Benchmark
public Object initAndGet()
{
aggregator.init(buf, 0);
return aggregator.get(buf, 0);
}
@Benchmark
public Object initAndSerde()
{
aggregator.init(buf, 0);
return aggregatorFactory.deserialize(((HllSketch) aggregator.get(buf, 0)).toCompactByteArray());
}
}

View File

@ -34,11 +34,16 @@
<relativePath>../../pom.xml</relativePath>
</parent>
<properties>
<datasketches.core.version>0.13.4</datasketches.core.version>
<datasketches.memory.version>0.12.2</datasketches.memory.version>
</properties>
<dependencies>
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
<version>0.13.4</version>
<version>${datasketches.core.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
@ -46,6 +51,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>memory</artifactId>
<version>${datasketches.memory.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>

View File

@ -23,6 +23,7 @@ import com.google.common.util.concurrent.Striped;
import com.yahoo.memory.WritableMemory;
import com.yahoo.sketches.hll.HllSketch;
import com.yahoo.sketches.hll.TgtHllType;
import com.yahoo.sketches.hll.Union;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.druid.query.aggregation.BufferAggregator;
@ -42,7 +43,9 @@ import java.util.concurrent.locks.ReadWriteLock;
public class HllSketchBuildBufferAggregator implements BufferAggregator
{
/** for locking per buffer position (power of 2 to make index computation faster) */
/**
* for locking per buffer position (power of 2 to make index computation faster)
*/
private static final int NUM_STRIPES = 64;
private final ColumnValueSelector<Object> selector;
@ -53,6 +56,13 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<HllSketch>> sketchCache = new IdentityHashMap<>();
private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
/**
* Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image.
* {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The
* "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
*/
private final byte[] emptySketch;
public HllSketchBuildBufferAggregator(
final ColumnValueSelector<Object> selector,
final int lgK,
@ -64,13 +74,29 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
this.lgK = lgK;
this.tgtHllType = tgtHllType;
this.size = size;
this.emptySketch = new byte[size];
//noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction)
new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch));
}
@Override
public void init(final ByteBuffer buf, final int position)
{
// Copy prebuilt empty sketch object.
final int oldPosition = buf.position();
try {
buf.position(position);
buf.put(emptySketch);
}
finally {
buf.position(oldPosition);
}
// Add an HllSketch for this chunk to our sketchCache.
final WritableMemory mem = getMemory(buf).writableRegion(position, size);
putSketchIntoCache(buf, position, new HllSketch(lgK, tgtHllType, mem));
putSketchIntoCache(buf, position, HllSketch.writableWrap(mem));
}
/**
@ -162,7 +188,9 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
/**
* compute lock index to avoid boxing in Striped.get() call
*
* @param position
*
* @return index
*/
static int lockIndex(final int position)
@ -172,7 +200,9 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
/**
* see https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548
*
* @param hashCode
*
* @return smeared hashCode
*/
private static int smear(int hashCode)

View File

@ -40,7 +40,9 @@ import java.util.concurrent.locks.ReadWriteLock;
public class HllSketchMergeBufferAggregator implements BufferAggregator
{
/** for locking per buffer position (power of 2 to make index computation faster) */
/**
* for locking per buffer position (power of 2 to make index computation faster)
*/
private static final int NUM_STRIPES = 64;
private final ColumnValueSelector<HllSketch> selector;
@ -49,6 +51,13 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
private final int size;
private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
/**
* Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image.
* {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The
* "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
*/
private final byte[] emptyUnion;
public HllSketchMergeBufferAggregator(
final ColumnValueSelector<HllSketch> selector,
final int lgK,
@ -60,17 +69,29 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
this.lgK = lgK;
this.tgtHllType = tgtHllType;
this.size = size;
this.emptyUnion = new byte[size];
//noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction)
new Union(lgK, WritableMemory.wrap(emptyUnion));
}
@SuppressWarnings("ResultOfObjectAllocationIgnored")
@Override
public void init(final ByteBuffer buf, final int position)
{
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
// Not necessary to keep the constructed object since it is cheap to reconstruct by wrapping the memory.
// The objects are not cached as in BuildBufferAggregator since they never exceed the max size and never move.
// So it is easier to reconstruct them by wrapping memory then to keep position-to-object mappings.
new Union(lgK, mem);
// Copy prebuilt empty union object.
// Not necessary to cache a Union wrapper around the initialized memory, because:
// - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get".
// - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the
// max size and therefore do not need to be potentially moved in-heap.
final int oldPosition = buf.position();
try {
buf.position(position);
buf.put(emptyUnion);
}
finally {
buf.position(oldPosition);
}
}
/**