From f2b54de205eff996e76bd3d634360cc59c6b3577 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 16 Apr 2021 18:45:46 -0700 Subject: [PATCH] Vectorized versions of HllSketch aggregators. (#11115) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Vectorized versions of HllSketch aggregators. The patch uses the sameĀ "helper" approach as #10767 and #10304, and extends the tests to run in both vectorized and non-vectorized modes. Also includes some minor changes to the theta sketch vector aggregator: - Cosmetic changes to make the hll and theta implementations look more similar. - Extends the theta SQL tests to run in vectorized mode. * Updates post-code-review. * Fix javadoc. --- .../hll/HllSketchBuildAggregatorFactory.java | 21 +++ .../hll/HllSketchBuildBufferAggregator.java | 132 ++--------------- .../HllSketchBuildBufferAggregatorHelper.java | 134 ++++++++++++++++++ .../hll/HllSketchBuildVectorAggregator.java | 114 +++++++++++++++ .../hll/HllSketchMergeAggregatorFactory.java | 21 +++ .../hll/HllSketchMergeBufferAggregator.java | 83 ++--------- .../HllSketchMergeBufferAggregatorHelper.java | 96 +++++++++++++ .../hll/HllSketchMergeVectorAggregator.java | 115 +++++++++++++++ .../theta/SketchVectorAggregator.java | 18 +-- .../hll/HllSketchAggregatorTest.java | 44 ++++-- .../hll/sql/HllSketchSqlAggregatorTest.java | 93 ++++++++---- .../sql/ThetaSketchSqlAggregatorTest.java | 122 ++++++++++++---- 12 files changed, 726 insertions(+), 267 deletions(-) create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java create mode 100644 extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java index 8abc305304e..df68180b7bf 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java @@ -26,8 +26,11 @@ import org.apache.datasketches.hll.TgtHllType; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; @@ -81,6 +84,24 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory ); } + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + return new HllSketchBuildVectorAggregator( + selectorFactory, + getFieldName(), + getLgK(), + TgtHllType.valueOf(getTgtHllType()), + getMaxIntermediateSize() + ); + } + /** * For the HLL_4 sketch type, this value can be exceeded slightly in extremely rare cases. * The sketch will request on-heap memory and move there. It is handled in HllSketchBuildBufferAggregator. diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java index 4c39259a6a9..ab54215e52f 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java @@ -19,22 +19,12 @@ package org.apache.druid.query.aggregation.datasketches.hll; -import com.google.common.util.concurrent.Striped; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; -import org.apache.datasketches.hll.Union; -import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.IdentityHashMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; /** * This aggregator builds sketches from raw data. @@ -42,26 +32,8 @@ import java.util.concurrent.locks.ReadWriteLock; */ public class HllSketchBuildBufferAggregator implements BufferAggregator { - - /** - * for locking per buffer position (power of 2 to make index computation faster) - */ - private static final int NUM_STRIPES = 64; - private final ColumnValueSelector selector; - private final int lgK; - private final TgtHllType tgtHllType; - private final int size; - private final IdentityHashMap memCache = new IdentityHashMap<>(); - private final IdentityHashMap> sketchCache = new IdentityHashMap<>(); - private final Striped stripedLock = Striped.readWriteLock(NUM_STRIPES); - - /** - * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image. - * {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The - * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects. - */ - private final byte[] emptySketch; + private final HllSketchBuildBufferAggregatorHelper helper; public HllSketchBuildBufferAggregator( final ColumnValueSelector selector, @@ -71,39 +43,15 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator ) { this.selector = selector; - this.lgK = lgK; - this.tgtHllType = tgtHllType; - this.size = size; - this.emptySketch = new byte[size]; - - //noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction) - new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch)); + this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size); } @Override public void init(final ByteBuffer buf, final int position) { - // Copy prebuilt empty sketch object. - - final int oldPosition = buf.position(); - try { - buf.position(position); - buf.put(emptySketch); - } - finally { - buf.position(oldPosition); - } - - // Add an HllSketch for this chunk to our sketchCache. - final WritableMemory mem = getMemory(buf).writableRegion(position, size); - putSketchIntoCache(buf, position, HllSketch.writableWrap(mem)); + helper.init(buf, position); } - /** - * This method uses locks because it can be used during indexing, - * and Druid can call aggregate() and get() concurrently - * See https://github.com/druid-io/druid/pull/3956 - */ @Override public void aggregate(final ByteBuffer buf, final int position) { @@ -111,40 +59,20 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator if (value == null) { return; } - final Lock lock = stripedLock.getAt(lockIndex(position)).writeLock(); - lock.lock(); - try { - final HllSketch sketch = sketchCache.get(buf).get(position); - HllSketchBuildAggregator.updateSketch(sketch, value); - } - finally { - lock.unlock(); - } + + HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value); } - /** - * This method uses locks because it can be used during indexing, - * and Druid can call aggregate() and get() concurrently - * See https://github.com/druid-io/druid/pull/3956 - */ @Override public Object get(final ByteBuffer buf, final int position) { - final Lock lock = stripedLock.getAt(lockIndex(position)).readLock(); - lock.lock(); - try { - return sketchCache.get(buf).get(position).copy(); - } - finally { - lock.unlock(); - } + return helper.get(buf, position); } @Override public void close() { - memCache.clear(); - sketchCache.clear(); + helper.clear(); } @Override @@ -159,11 +87,6 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator throw new UnsupportedOperationException("Not implemented"); } - private WritableMemory getMemory(final ByteBuffer buf) - { - return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN)); - } - /** * In very rare cases sketches can exceed given memory, request on-heap memory and move there. * We need to identify such sketches and reuse the same objects as opposed to wrapping new memory regions. @@ -171,44 +94,7 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator @Override public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf) { - HllSketch sketch = sketchCache.get(oldBuf).get(oldPosition); - final WritableMemory oldMem = getMemory(oldBuf).writableRegion(oldPosition, size); - if (sketch.isSameResource(oldMem)) { // sketch has not moved - final WritableMemory newMem = getMemory(newBuf).writableRegion(newPosition, size); - sketch = HllSketch.writableWrap(newMem); - } - putSketchIntoCache(newBuf, newPosition, sketch); - } - - private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch) - { - final Int2ObjectMap map = sketchCache.computeIfAbsent(buf, b -> new Int2ObjectOpenHashMap<>()); - map.put(position, sketch); - } - - /** - * compute lock index to avoid boxing in Striped.get() call - * - * @param position - * - * @return index - */ - static int lockIndex(final int position) - { - return smear(position) % NUM_STRIPES; - } - - /** - * see https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548 - * - * @param hashCode - * - * @return smeared hashCode - */ - private static int smear(int hashCode) - { - hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12); - return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4); + helper.relocate(oldPosition, newPosition, oldBuf, newBuf); } @Override @@ -218,6 +104,6 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator // lgK should be inspected because different execution paths exist in HllSketch.update() that is called from // @CalledFromHotLoop-annotated aggregate() depending on the lgK. // See https://github.com/apache/druid/pull/6893#discussion_r250726028 - inspector.visit("lgK", lgK); + inspector.visit("lgK", helper.getLgK()); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java new file mode 100644 index 00000000000..466f1aca854 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregatorHelper.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.TgtHllType; +import org.apache.datasketches.memory.WritableMemory; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.IdentityHashMap; + +public class HllSketchBuildBufferAggregatorHelper +{ + private final int lgK; + private final int size; + private final IdentityHashMap memCache = new IdentityHashMap<>(); + private final IdentityHashMap> sketchCache = new IdentityHashMap<>(); + + /** + * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image. + * {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The + * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link org.apache.datasketches.hll.Union} objects. + */ + private final byte[] emptySketch; + + public HllSketchBuildBufferAggregatorHelper(final int lgK, final TgtHllType tgtHllType, final int size) + { + this.lgK = lgK; + this.size = size; + this.emptySketch = new byte[size]; + + //noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction) + new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch)); + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#init}. + */ + public void init(final ByteBuffer buf, final int position) + { + // Copy prebuilt empty sketch object. + + final int oldPosition = buf.position(); + try { + buf.position(position); + buf.put(emptySketch); + } + finally { + buf.position(oldPosition); + } + + // Add an HllSketch for this chunk to our sketchCache. + final WritableMemory mem = getMemory(buf).writableRegion(position, size); + putSketchIntoCache(buf, position, HllSketch.writableWrap(mem)); + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#get}. + */ + public Object get(ByteBuffer buf, int position) + { + return sketchCache.get(buf).get(position).copy(); + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#relocate} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#relocate}. + */ + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuf, ByteBuffer newBuf) + { + HllSketch sketch = sketchCache.get(oldBuf).get(oldPosition); + final WritableMemory oldMem = getMemory(oldBuf).writableRegion(oldPosition, size); + if (sketch.isSameResource(oldMem)) { // sketch has not moved + final WritableMemory newMem = getMemory(newBuf).writableRegion(newPosition, size); + sketch = HllSketch.writableWrap(newMem); + } + putSketchIntoCache(newBuf, newPosition, sketch); + } + + /** + * Retrieves the sketch at a particular position. + */ + public HllSketch getSketchAtPosition(final ByteBuffer buf, final int position) + { + return sketchCache.get(buf).get(position); + } + + /** + * Clean up resources used by this helper. + */ + public void clear() + { + memCache.clear(); + sketchCache.clear(); + } + + public int getLgK() + { + return lgK; + } + + private WritableMemory getMemory(final ByteBuffer buf) + { + return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN)); + } + + private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch) + { + final Int2ObjectMap map = sketchCache.computeIfAbsent(buf, b -> new Int2ObjectOpenHashMap<>()); + map.put(position, sketch); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java new file mode 100644 index 00000000000..506c9c3a2a7 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import org.apache.datasketches.hll.TgtHllType; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory; +import org.apache.druid.segment.ColumnProcessors; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.function.Supplier; + +public class HllSketchBuildVectorAggregator implements VectorAggregator +{ + private final HllSketchBuildBufferAggregatorHelper helper; + private final Supplier objectSupplier; + + HllSketchBuildVectorAggregator( + final VectorColumnSelectorFactory columnSelectorFactory, + final String column, + final int lgK, + final TgtHllType tgtHllType, + final int size + ) + { + this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size); + this.objectSupplier = + ColumnProcessors.makeVectorProcessor( + column, + ToObjectVectorColumnProcessorFactory.INSTANCE, + columnSelectorFactory + ); + } + + @Override + public void init(final ByteBuffer buf, final int position) + { + helper.init(buf, position); + } + + @Override + public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) + { + final Object[] vector = objectSupplier.get(); + for (int i = startRow; i < endRow; i++) { + final Object value = vector[i]; + if (value != null) { + HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value); + } + } + } + + @Override + public void aggregate( + final ByteBuffer buf, + final int numRows, + final int[] positions, + @Nullable final int[] rows, + final int positionOffset + ) + { + final Object[] vector = objectSupplier.get(); + + for (int i = 0; i < numRows; i++) { + final Object o = vector[rows != null ? rows[i] : i]; + + if (o != null) { + final int position = positions[i] + positionOffset; + HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), o); + } + } + } + + @Override + public Object get(final ByteBuffer buf, final int position) + { + return helper.get(buf, position); + } + + /** + * In very rare cases sketches can exceed given memory, request on-heap memory and move there. + * We need to identify such sketches and reuse the same objects as opposed to wrapping new memory regions. + */ + @Override + public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf) + { + helper.relocate(oldPosition, newPosition, oldBuf, newBuf); + } + + @Override + public void close() + { + helper.clear(); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java index 74afea31bfe..050cf59e1fc 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java @@ -29,8 +29,11 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; @@ -102,6 +105,24 @@ public class HllSketchMergeAggregatorFactory extends HllSketchAggregatorFactory ); } + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + return new HllSketchMergeVectorAggregator( + selectorFactory, + getFieldName(), + getLgK(), + TgtHllType.valueOf(getTgtHllType()), + getMaxIntermediateSize() + ); + } + @Override public int getMaxIntermediateSize() { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java index 7161c25fb6b..278a5c53be6 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java @@ -19,7 +19,6 @@ package org.apache.druid.query.aggregation.datasketches.hll; -import com.google.common.util.concurrent.Striped; import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.hll.Union; @@ -30,8 +29,6 @@ import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; /** * This aggregator merges existing sketches. @@ -39,24 +36,8 @@ import java.util.concurrent.locks.ReadWriteLock; */ public class HllSketchMergeBufferAggregator implements BufferAggregator { - - /** - * for locking per buffer position (power of 2 to make index computation faster) - */ - private static final int NUM_STRIPES = 64; - private final ColumnValueSelector selector; - private final int lgK; - private final TgtHllType tgtHllType; - private final int size; - private final Striped stripedLock = Striped.readWriteLock(NUM_STRIPES); - - /** - * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image. - * {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The - * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects. - */ - private final byte[] emptyUnion; + private final HllSketchMergeBufferAggregatorHelper helper; public HllSketchMergeBufferAggregator( final ColumnValueSelector selector, @@ -66,39 +47,15 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator ) { this.selector = selector; - this.lgK = lgK; - this.tgtHllType = tgtHllType; - this.size = size; - this.emptyUnion = new byte[size]; - - //noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction) - new Union(lgK, WritableMemory.wrap(emptyUnion)); + this.helper = new HllSketchMergeBufferAggregatorHelper(lgK, tgtHllType, size); } @Override public void init(final ByteBuffer buf, final int position) { - // Copy prebuilt empty union object. - // Not necessary to cache a Union wrapper around the initialized memory, because: - // - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get". - // - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the - // max size and therefore do not need to be potentially moved in-heap. - - final int oldPosition = buf.position(); - try { - buf.position(position); - buf.put(emptyUnion); - } - finally { - buf.position(oldPosition); - } + helper.init(buf, position); } - /** - * This method uses locks because it can be used during indexing, - * and Druid can call aggregate() and get() concurrently - * See https://github.com/druid-io/druid/pull/3956 - */ @Override public void aggregate(final ByteBuffer buf, final int position) { @@ -106,36 +63,18 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator if (sketch == null) { return; } - final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size); - final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).writeLock(); - lock.lock(); - try { - final Union union = Union.writableWrap(mem); - union.update(sketch); - } - finally { - lock.unlock(); - } + + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN) + .writableRegion(position, helper.getSize()); + + final Union union = Union.writableWrap(mem); + union.update(sketch); } - /** - * This method uses locks because it can be used during indexing, - * and Druid can call aggregate() and get() concurrently - * See https://github.com/druid-io/druid/pull/3956 - */ @Override public Object get(final ByteBuffer buf, final int position) { - final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size); - final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).readLock(); - lock.lock(); - try { - final Union union = Union.writableWrap(mem); - return union.getResult(tgtHllType); - } - finally { - lock.unlock(); - } + return helper.get(buf, position); } @Override @@ -163,6 +102,6 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator // lgK should be inspected because different execution paths exist in Union.update() that is called from // @CalledFromHotLoop-annotated aggregate() depending on the lgK. // See https://github.com/apache/druid/pull/6893#discussion_r250726028 - inspector.visit("lgK", lgK); + inspector.visit("lgK", helper.getLgK()); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java new file mode 100644 index 00000000000..d6030e88501 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.TgtHllType; +import org.apache.datasketches.hll.Union; +import org.apache.datasketches.memory.WritableMemory; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class HllSketchMergeBufferAggregatorHelper +{ + private final int lgK; + private final TgtHllType tgtHllType; + private final int size; + + /** + * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image. + * {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The + * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects. + */ + private final byte[] emptyUnion; + + public HllSketchMergeBufferAggregatorHelper(int lgK, TgtHllType tgtHllType, int size) + { + this.lgK = lgK; + this.tgtHllType = tgtHllType; + this.size = size; + this.emptyUnion = new byte[size]; + + //noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction) + new Union(lgK, WritableMemory.wrap(emptyUnion)); + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#init}. + */ + public void init(final ByteBuffer buf, final int position) + { + // Copy prebuilt empty union object. + // Not necessary to cache a Union wrapper around the initialized memory, because: + // - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get". + // - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the + // max size and therefore do not need to be potentially moved in-heap. + + final int oldPosition = buf.position(); + try { + buf.position(position); + buf.put(emptyUnion); + } + finally { + buf.position(oldPosition); + } + } + + /** + * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and + * {@link org.apache.druid.query.aggregation.VectorAggregator#get}. + */ + public Object get(ByteBuffer buf, int position) + { + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size); + final Union union = Union.writableWrap(mem); + return union.getResult(tgtHllType); + } + + public int getLgK() + { + return lgK; + } + + public int getSize() + { + return size; + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java new file mode 100644 index 00000000000..d97c0b5ce67 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.TgtHllType; +import org.apache.datasketches.hll.Union; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory; +import org.apache.druid.segment.ColumnProcessors; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.function.Supplier; + +public class HllSketchMergeVectorAggregator implements VectorAggregator +{ + private final HllSketchMergeBufferAggregatorHelper helper; + private final Supplier objectSupplier; + + HllSketchMergeVectorAggregator( + final VectorColumnSelectorFactory columnSelectorFactory, + final String column, + final int lgK, + final TgtHllType tgtHllType, + final int size + ) + { + this.helper = new HllSketchMergeBufferAggregatorHelper(lgK, tgtHllType, size); + this.objectSupplier = + ColumnProcessors.makeVectorProcessor( + column, + ToObjectVectorColumnProcessorFactory.INSTANCE, + columnSelectorFactory + ); + } + + @Override + public void init(final ByteBuffer buf, final int position) + { + helper.init(buf, position); + } + + @Override + public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) + { + final Object[] vector = objectSupplier.get(); + + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN) + .writableRegion(position, helper.getSize()); + + final Union union = Union.writableWrap(mem); + for (int i = startRow; i < endRow; i++) { + union.update((HllSketch) vector[i]); + } + } + + @Override + public void aggregate( + final ByteBuffer buf, + final int numRows, + final int[] positions, + @Nullable final int[] rows, + final int positionOffset + ) + { + final Object[] vector = objectSupplier.get(); + + for (int i = 0; i < numRows; i++) { + final HllSketch o = (HllSketch) vector[rows != null ? rows[i] : i]; + + if (o != null) { + final int position = positions[i] + positionOffset; + + final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN) + .writableRegion(position, helper.getSize()); + + final Union union = Union.writableWrap(mem); + union.update(o); + } + } + } + + @Override + public Object get(final ByteBuffer buf, final int position) + { + return helper.get(buf, position); + } + + @Override + public void close() + { + // Nothing to close. + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java index b5b9ad842cf..a862265d561 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java @@ -31,18 +31,18 @@ import java.util.function.Supplier; public class SketchVectorAggregator implements VectorAggregator { - private final Supplier toObjectProcessor; private final SketchBufferAggregatorHelper helper; + private final Supplier objectSupplier; - public SketchVectorAggregator( - VectorColumnSelectorFactory columnSelectorFactory, - String column, - int size, - int maxIntermediateSize + SketchVectorAggregator( + final VectorColumnSelectorFactory columnSelectorFactory, + final String column, + final int size, + final int maxIntermediateSize ) { this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize); - this.toObjectProcessor = + this.objectSupplier = ColumnProcessors.makeVectorProcessor( column, ToObjectVectorColumnProcessorFactory.INSTANCE, @@ -60,7 +60,7 @@ public class SketchVectorAggregator implements VectorAggregator public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) { final Union union = helper.getOrCreateUnion(buf, position); - final Object[] vector = toObjectProcessor.get(); + final Object[] vector = objectSupplier.get(); for (int i = startRow; i < endRow; i++) { final Object o = vector[i]; @@ -79,7 +79,7 @@ public class SketchVectorAggregator implements VectorAggregator final int positionOffset ) { - final Object[] vector = toObjectProcessor.get(); + final Object[] vector = objectSupplier.get(); for (int i = 0; i < numRows; i++) { final Object o = vector[rows != null ? rows[i] : i]; diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java index a299257fc71..afac5739db9 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.AggregationTestHelper; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.groupby.GroupByQuery; @@ -54,23 +55,27 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest private static final boolean ROUND = true; private final AggregationTestHelper helper; + private final QueryContexts.Vectorize vectorize; @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - public HllSketchAggregatorTest(GroupByQueryConfig config) + public HllSketchAggregatorTest(GroupByQueryConfig config, String vectorize) { HllSketchModule.registerSerde(); helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( new HllSketchModule().getJacksonModules(), config, tempFolder); + this.vectorize = QueryContexts.Vectorize.fromString(vectorize); } - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "config = {0}, vectorize = {1}") public static Collection constructorFeeder() { final List constructors = new ArrayList<>(); for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { - constructors.add(new Object[]{config}); + for (String vectorize : new String[]{"false", "true", "force"}) { + constructors.add(new Object[]{config, vectorize}); + } } return constructors; } @@ -224,10 +229,32 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest ) .setPostAggregatorSpecs( ImmutableList.of( - new HllSketchToEstimatePostAggregator("estimate", new FieldAccessPostAggregator("f1", "sketch"), false), - new HllSketchToEstimateWithBoundsPostAggregator("estimateWithBounds", new FieldAccessPostAggregator("f1", "sketch"), 2), - new HllSketchToStringPostAggregator("summary", new FieldAccessPostAggregator("f1", "sketch")), - new HllSketchUnionPostAggregator("union", ImmutableList.of(new FieldAccessPostAggregator("f1", "sketch"), new FieldAccessPostAggregator("f2", "sketch")), null, null) + new HllSketchToEstimatePostAggregator( + "estimate", + new FieldAccessPostAggregator("f1", "sketch"), + false + ), + new HllSketchToEstimateWithBoundsPostAggregator( + "estimateWithBounds", + new FieldAccessPostAggregator( + "f1", + "sketch" + ), + 2 + ), + new HllSketchToStringPostAggregator( + "summary", + new FieldAccessPostAggregator("f1", "sketch") + ), + new HllSketchUnionPostAggregator( + "union", + ImmutableList.of(new FieldAccessPostAggregator( + "f1", + "sketch" + ), new FieldAccessPostAggregator("f2", "sketch")), + null, + null + ) ) ) .build() @@ -320,7 +347,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest ); } - private static String buildGroupByQueryJson( + private String buildGroupByQueryJson( String aggregationType, String aggregationFieldName, boolean aggregationRound @@ -338,6 +365,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest .put("dimensions", Collections.emptyList()) .put("aggregations", Collections.singletonList(aggregation)) .put("intervals", Collections.singletonList("2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z")) + .put("context", ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize.toString())) .build(); return toJson(object); } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java index 039801c5efc..b1faed1cbed 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -84,34 +85,60 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +@RunWith(Parameterized.class) public class HllSketchSqlAggregatorTest extends CalciteTestBase { private static final String DATA_SOURCE = "foo"; private static final boolean ROUND = true; - private static final Map QUERY_CONTEXT_DEFAULT = ImmutableMap.of( - PlannerContext.CTX_SQL_QUERY_ID, "dummy" - ); private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT; + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public QueryLogHook queryLogHook = QueryLogHook.create(TestHelper.JSON_MAPPER); - + + private final Map queryContext; private SpecificSegmentsQuerySegmentWalker walker; private SqlLifecycleFactory sqlLifecycleFactory; + public HllSketchSqlAggregatorTest(final String vectorize) + { + this.queryContext = ImmutableMap.of( + PlannerContext.CTX_SQL_QUERY_ID, "dummy", + QueryContexts.VECTORIZE_KEY, vectorize, + QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize + ); + } + + @Parameterized.Parameters(name = "vectorize = {0}") + public static Collection constructorFeeder() + { + final List constructors = new ArrayList<>(); + for (String vectorize : new String[]{"false", "true", "force"}) { + constructors.add(new Object[]{vectorize}); + } + return constructors; + } + @BeforeClass public static void setUpClass() { @@ -207,6 +234,9 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase @Test public void testApproxCountDistinctHllSketch() throws Exception { + // Can't vectorize due to CONCAT expression. + cannotVectorize(); + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); final String sql = "SELECT\n" @@ -222,7 +252,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase // Verify results final List results = sqlLifecycle.runSimple( sql, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -317,8 +347,9 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, ROUND) ) ) - .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) - .build(), + .context(queryContext) + .build() + .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)), Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) ); } @@ -327,6 +358,9 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase @Test public void testAvgDailyCountDistinctHllSketch() throws Exception { + // Can't vectorize due to outer query, which runs on an inline datasource. + cannotVectorize(); + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); final String sql = "SELECT\n" @@ -340,7 +374,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase // Verify results final List results = sqlLifecycle.runSimple( sql, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -379,11 +413,14 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase new FinalizingFieldAccessPostAggregator("a0", "a0:a") ) ) - .context(BaseCalciteQueryTest.getTimeseriesContextWithFloorTime( - ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"), - "d0" - )) + .context(queryContext) .build() + .withOverriddenContext( + BaseCalciteQueryTest.getTimeseriesContextWithFloorTime( + ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"), + "d0" + ) + ) ) ) .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) @@ -414,7 +451,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase ) ) ) - .setContext(QUERY_CONTEXT_DEFAULT) + .setContext(queryContext) .build(); Query actual = Iterables.getOnlyElement(queryLogHook.getRecordedQueries()); @@ -437,7 +474,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase // Verify results final List results = - sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + sqlLifecycle.runSimple(sql, queryContext, DEFAULT_PARAMETERS, authenticationResult).toList(); final int expected = NullHandling.replaceWithDefault() ? 1 : 2; Assert.assertEquals(expected, results.size()); } @@ -466,7 +503,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase // Verify results final List results = sqlLifecycle.runSimple( sql, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -598,11 +635,9 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase new HllSketchToEstimatePostAggregator("p23", new FieldAccessPostAggregator("p22", "a0"), true) ) ) - .context(ImmutableMap.of( - "skipEmptyBuckets", true, - PlannerContext.CTX_SQL_QUERY_ID, "dummy" - )) - .build(); + .context(queryContext) + .build() + .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)); // Verify query Assert.assertEquals(expectedQuery, actualQuery); @@ -619,7 +654,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase // Verify results final List results = sqlLifecycle.runSimple( sql2, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -670,13 +705,19 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase new HllSketchToStringPostAggregator("s3", new FieldAccessPostAggregator("s2", "p0")) ) ) - .context(ImmutableMap.of( - "skipEmptyBuckets", true, - PlannerContext.CTX_SQL_QUERY_ID, "dummy" - )) - .build(); + .context(queryContext) + .build() + .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)); // Verify query Assert.assertEquals(expectedQuery, actualQuery); } + + private void cannotVectorize() + { + if (QueryContexts.Vectorize.fromString((String) queryContext.get(QueryContexts.VECTORIZE_KEY)) + == QueryContexts.Vectorize.FORCE) { + expectedException.expectMessage("Cannot vectorize"); + } + } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java index 9201c91b994..0aac2b088cc 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -81,14 +82,20 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +@RunWith(Parameterized.class) public class ThetaSketchSqlAggregatorTest extends CalciteTestBase { private static final String DATA_SOURCE = "foo"; @@ -96,9 +103,6 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT; - private static final Map QUERY_CONTEXT_DEFAULT = ImmutableMap.of( - PlannerContext.CTX_SQL_QUERY_ID, "dummy" - ); @BeforeClass public static void setUpClass() @@ -113,15 +117,38 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase resourceCloser.close(); } + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public QueryLogHook queryLogHook = QueryLogHook.create(); + private final Map queryContext; private SpecificSegmentsQuerySegmentWalker walker; private SqlLifecycleFactory sqlLifecycleFactory; + public ThetaSketchSqlAggregatorTest(final String vectorize) + { + this.queryContext = ImmutableMap.of( + PlannerContext.CTX_SQL_QUERY_ID, "dummy", + QueryContexts.VECTORIZE_KEY, vectorize, + QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize + ); + } + + @Parameterized.Parameters(name = "vectorize = {0}") + public static Collection constructorFeeder() + { + final List constructors = new ArrayList<>(); + for (String vectorize : new String[]{"false", "true", "force"}) { + constructors.add(new Object[]{vectorize}); + } + return constructors; + } + @Before public void setUp() throws Exception { @@ -206,21 +233,30 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase @Test public void testApproxCountDistinctThetaSketch() throws Exception { + // Cannot vectorize due to SUBSTRING. + cannotVectorize(); + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); final String sql = "SELECT\n" + " SUM(cnt),\n" - + " APPROX_COUNT_DISTINCT_DS_THETA(dim2),\n" // uppercase - + " APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim2 <> ''),\n" // lowercase; also, filtered - + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1)),\n" // on extractionFn - + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1) || 'x'),\n" // on expression - + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1, 32768),\n" // on native theta sketch column - + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1)\n" // on native theta sketch column + + " APPROX_COUNT_DISTINCT_DS_THETA(dim2),\n" + // uppercase + + " APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim2 <> ''),\n" + // lowercase; also, filtered + + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1)),\n" + // on extractionFn + + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1) || 'x'),\n" + // on expression + + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1, 32768),\n" + // on native theta sketch column + + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1)\n" + // on native theta sketch column + "FROM druid.foo"; // Verify results final List results = sqlLifecycle.runSimple( sql, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -319,8 +355,9 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase new SketchMergeAggregatorFactory("a6", "thetasketch_dim1", null, null, null, null) ) ) - .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) - .build(), + .context(queryContext) + .build() + .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)), Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) ); } @@ -328,6 +365,9 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase @Test public void testAvgDailyCountDistinctThetaSketch() throws Exception { + // Can't vectorize due to outer query (it operates on an inlined data source, which cannot be vectorized). + cannotVectorize(); + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); final String sql = "SELECT\n" @@ -337,7 +377,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase // Verify results final List results = sqlLifecycle.runSimple( sql, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -358,7 +398,11 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of( Filtration.eternity() ))) - .granularity(new PeriodGranularity(Period.days(1), null, DateTimeZone.UTC)) + .granularity(new PeriodGranularity( + Period.days(1), + null, + DateTimeZone.UTC + )) .aggregators( Collections.singletonList( new SketchMergeAggregatorFactory( @@ -373,14 +417,25 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase ) .postAggregators( ImmutableList.of( - new FinalizingFieldAccessPostAggregator("a0", "a0:a") + new FinalizingFieldAccessPostAggregator( + "a0", + "a0:a" + ) ) ) - .context(BaseCalciteQueryTest.getTimeseriesContextWithFloorTime( - ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"), - "d0" - )) + .context(queryContext) .build() + .withOverriddenContext( + BaseCalciteQueryTest.getTimeseriesContextWithFloorTime( + ImmutableMap.of( + "skipEmptyBuckets", + true, + "sqlQueryId", + "dummy" + ), + "d0" + ) + ) ) ) .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) @@ -388,8 +443,8 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase .setAggregatorSpecs( NullHandling.replaceWithDefault() ? Arrays.asList( - new LongSumAggregatorFactory("_a0:sum", "a0"), - new CountAggregatorFactory("_a0:count") + new LongSumAggregatorFactory("_a0:sum", "a0"), + new CountAggregatorFactory("_a0:count") ) : Arrays.asList( new LongSumAggregatorFactory("_a0:sum", "a0"), @@ -411,7 +466,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase ) ) ) - .setContext(QUERY_CONTEXT_DEFAULT) + .setContext(queryContext) .build(); Query actual = Iterables.getOnlyElement(queryLogHook.getRecordedQueries()); @@ -439,7 +494,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase // Verify results final List results = sqlLifecycle.runSimple( sql, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -598,8 +653,9 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase null ) ) - .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) - .build(); + .context(queryContext) + .build() + .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)); // Verify query @@ -617,7 +673,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase // Verify results final List results = sqlLifecycle.runSimple( sql2, - QUERY_CONTEXT_DEFAULT, + queryContext, DEFAULT_PARAMETERS, authenticationResult ).toList(); @@ -664,11 +720,19 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase null ) ) - .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) - .build(); - + .context(queryContext) + .build() + .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)); // Verify query Assert.assertEquals(expectedQuery, actualQuery); } + + private void cannotVectorize() + { + if (QueryContexts.Vectorize.fromString((String) queryContext.get(QueryContexts.VECTORIZE_KEY)) + == QueryContexts.Vectorize.FORCE) { + expectedException.expectMessage("Cannot vectorize"); + } + } }