From 63461311f8d38fa86d7007756f2320d7829ccd9a Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 31 Jul 2019 08:18:42 -0700 Subject: [PATCH] HllSketch Merge/Build BufferAggregators: Speed up init with prebuilt sketch. (#8194) * HllSketchMergeBufferAggregator: Speed up init by copying prebuilt sketch. * Remove useless writableRegion call. * POM variables. * Fix missing reposition. * Apply similar optimization to HllSketchBuildBufferAggregator. * Rename emptySketch -> emptyUnion in merge flavor. * Adjustments based on review. * Comment update. * Additional updates. * Comment push. --- .../benchmark/DataSketchesHllBenchmark.java | 123 ++++++++++++++++++ extensions-core/datasketches/pom.xml | 12 +- .../hll/HllSketchBuildBufferAggregator.java | 34 ++++- .../hll/HllSketchMergeBufferAggregator.java | 35 ++++- 4 files changed, 194 insertions(+), 10 deletions(-) create mode 100644 benchmarks/src/main/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java new file mode 100644 index 00000000000..3493f559412 --- /dev/null +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark; + +import com.yahoo.sketches.hll.HllSketch; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggregatorFactory; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@Fork(1) +@State(Scope.Benchmark) +public class DataSketchesHllBenchmark +{ + private final AggregatorFactory aggregatorFactory = new HllSketchMergeAggregatorFactory( + "hll", + "hll", + null, + null, + false + ); + + private final ByteBuffer buf = ByteBuffer.allocateDirect(aggregatorFactory.getMaxIntermediateSize()); + + private BufferAggregator aggregator; + + @Setup(Level.Trial) + public void setUp() + { + aggregator = aggregatorFactory.factorizeBuffered( + new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return null; + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + return null; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + return null; + } + } + ); + } + + @TearDown(Level.Trial) + public void tearDown() + { + aggregator.close(); + aggregator = null; + } + + @Benchmark + public void init(Blackhole bh) + { + aggregator.init(buf, 0); + } + + @Benchmark + public Object initAndGet() + { + aggregator.init(buf, 0); + return aggregator.get(buf, 0); + } + + @Benchmark + public Object initAndSerde() + { + aggregator.init(buf, 0); + return aggregatorFactory.deserialize(((HllSketch) aggregator.get(buf, 0)).toCompactByteArray()); + } +} diff --git a/extensions-core/datasketches/pom.xml b/extensions-core/datasketches/pom.xml index db91883f1cf..151b5544202 100644 --- a/extensions-core/datasketches/pom.xml +++ b/extensions-core/datasketches/pom.xml @@ -34,11 +34,16 @@ ../../pom.xml + + 0.13.4 + 0.12.2 + + com.yahoo.datasketches sketches-core - 0.13.4 + ${datasketches.core.version} com.google.code.findbugs @@ -46,6 +51,11 @@ + + com.yahoo.datasketches + memory + ${datasketches.memory.version} + org.apache.commons commons-math3 diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java index 5532866a2d8..5789127038b 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.Striped; import com.yahoo.memory.WritableMemory; import com.yahoo.sketches.hll.HllSketch; import com.yahoo.sketches.hll.TgtHllType; +import com.yahoo.sketches.hll.Union; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import org.apache.druid.query.aggregation.BufferAggregator; @@ -42,7 +43,9 @@ import java.util.concurrent.locks.ReadWriteLock; public class HllSketchBuildBufferAggregator implements BufferAggregator { - /** for locking per buffer position (power of 2 to make index computation faster) */ + /** + * for locking per buffer position (power of 2 to make index computation faster) + */ private static final int NUM_STRIPES = 64; private final ColumnValueSelector selector; @@ -53,6 +56,13 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator private final IdentityHashMap> sketchCache = new IdentityHashMap<>(); private final Striped stripedLock = Striped.readWriteLock(NUM_STRIPES); + /** + * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image. + * {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The + * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects. + */ + private final byte[] emptySketch; + public HllSketchBuildBufferAggregator( final ColumnValueSelector selector, final int lgK, @@ -64,13 +74,29 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator this.lgK = lgK; this.tgtHllType = tgtHllType; this.size = size; + this.emptySketch = new byte[size]; + + //noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction) + new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch)); } @Override public void init(final ByteBuffer buf, final int position) { + // Copy prebuilt empty sketch object. + + final int oldPosition = buf.position(); + try { + buf.position(position); + buf.put(emptySketch); + } + finally { + buf.position(oldPosition); + } + + // Add an HllSketch for this chunk to our sketchCache. final WritableMemory mem = getMemory(buf).writableRegion(position, size); - putSketchIntoCache(buf, position, new HllSketch(lgK, tgtHllType, mem)); + putSketchIntoCache(buf, position, HllSketch.writableWrap(mem)); } /** @@ -162,7 +188,9 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator /** * compute lock index to avoid boxing in Striped.get() call + * * @param position + * * @return index */ static int lockIndex(final int position) @@ -172,7 +200,9 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator /** * see https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548 + * * @param hashCode + * * @return smeared hashCode */ private static int smear(int hashCode) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java index 0167e1ad09b..aa27706bfca 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java @@ -40,7 +40,9 @@ import java.util.concurrent.locks.ReadWriteLock; public class HllSketchMergeBufferAggregator implements BufferAggregator { - /** for locking per buffer position (power of 2 to make index computation faster) */ + /** + * for locking per buffer position (power of 2 to make index computation faster) + */ private static final int NUM_STRIPES = 64; private final ColumnValueSelector selector; @@ -49,6 +51,13 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator private final int size; private final Striped stripedLock = Striped.readWriteLock(NUM_STRIPES); + /** + * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image. + * {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The + * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects. + */ + private final byte[] emptyUnion; + public HllSketchMergeBufferAggregator( final ColumnValueSelector selector, final int lgK, @@ -60,17 +69,29 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator this.lgK = lgK; this.tgtHllType = tgtHllType; this.size = size; + this.emptyUnion = new byte[size]; + + //noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction) + new Union(lgK, WritableMemory.wrap(emptyUnion)); } - @SuppressWarnings("ResultOfObjectAllocationIgnored") @Override public void init(final ByteBuffer buf, final int position) { - final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size); - // Not necessary to keep the constructed object since it is cheap to reconstruct by wrapping the memory. - // The objects are not cached as in BuildBufferAggregator since they never exceed the max size and never move. - // So it is easier to reconstruct them by wrapping memory then to keep position-to-object mappings. - new Union(lgK, mem); + // Copy prebuilt empty union object. + // Not necessary to cache a Union wrapper around the initialized memory, because: + // - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get". + // - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the + // max size and therefore do not need to be potentially moved in-heap. + + final int oldPosition = buf.position(); + try { + buf.position(position); + buf.put(emptyUnion); + } + finally { + buf.position(oldPosition); + } } /**