Vectorized versions of HllSketch aggregators. (#11115)

* Vectorized versions of HllSketch aggregators.

The patch uses the same "helper" approach as #10767 and #10304, and
extends the tests to run in both vectorized and non-vectorized modes.

Also includes some minor changes to the theta sketch vector aggregator:

- Cosmetic changes to make the hll and theta implementations look
  more similar.
- Extends the theta SQL tests to run in vectorized mode.

* Updates post-code-review.

* Fix javadoc.
This commit is contained in:
Gian Merlino 2021-04-16 18:45:46 -07:00 committed by GitHub
parent 26d1074ade
commit f2b54de205
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 726 additions and 267 deletions

View File

@ -26,8 +26,11 @@ import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -81,6 +84,24 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
); );
} }
@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}
@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
{
return new HllSketchBuildVectorAggregator(
selectorFactory,
getFieldName(),
getLgK(),
TgtHllType.valueOf(getTgtHllType()),
getMaxIntermediateSize()
);
}
/** /**
* For the HLL_4 sketch type, this value can be exceeded slightly in extremely rare cases. * For the HLL_4 sketch type, this value can be exceeded slightly in extremely rare cases.
* The sketch will request on-heap memory and move there. It is handled in HllSketchBuildBufferAggregator. * The sketch will request on-heap memory and move there. It is handled in HllSketchBuildBufferAggregator.

View File

@ -19,22 +19,12 @@
package org.apache.druid.query.aggregation.datasketches.hll; package org.apache.druid.query.aggregation.datasketches.hll;
import com.google.common.util.concurrent.Striped;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.hll.Union;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
/** /**
* This aggregator builds sketches from raw data. * This aggregator builds sketches from raw data.
@ -42,26 +32,8 @@ import java.util.concurrent.locks.ReadWriteLock;
*/ */
public class HllSketchBuildBufferAggregator implements BufferAggregator public class HllSketchBuildBufferAggregator implements BufferAggregator
{ {
/**
* for locking per buffer position (power of 2 to make index computation faster)
*/
private static final int NUM_STRIPES = 64;
private final ColumnValueSelector<Object> selector; private final ColumnValueSelector<Object> selector;
private final int lgK; private final HllSketchBuildBufferAggregatorHelper helper;
private final TgtHllType tgtHllType;
private final int size;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<HllSketch>> sketchCache = new IdentityHashMap<>();
private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
/**
* Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image.
* {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The
* "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
*/
private final byte[] emptySketch;
public HllSketchBuildBufferAggregator( public HllSketchBuildBufferAggregator(
final ColumnValueSelector<Object> selector, final ColumnValueSelector<Object> selector,
@ -71,39 +43,15 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
) )
{ {
this.selector = selector; this.selector = selector;
this.lgK = lgK; this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size);
this.tgtHllType = tgtHllType;
this.size = size;
this.emptySketch = new byte[size];
//noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction)
new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch));
} }
@Override @Override
public void init(final ByteBuffer buf, final int position) public void init(final ByteBuffer buf, final int position)
{ {
// Copy prebuilt empty sketch object. helper.init(buf, position);
final int oldPosition = buf.position();
try {
buf.position(position);
buf.put(emptySketch);
}
finally {
buf.position(oldPosition);
}
// Add an HllSketch for this chunk to our sketchCache.
final WritableMemory mem = getMemory(buf).writableRegion(position, size);
putSketchIntoCache(buf, position, HllSketch.writableWrap(mem));
} }
/**
* This method uses locks because it can be used during indexing,
* and Druid can call aggregate() and get() concurrently
* See https://github.com/druid-io/druid/pull/3956
*/
@Override @Override
public void aggregate(final ByteBuffer buf, final int position) public void aggregate(final ByteBuffer buf, final int position)
{ {
@ -111,40 +59,20 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
if (value == null) { if (value == null) {
return; return;
} }
final Lock lock = stripedLock.getAt(lockIndex(position)).writeLock();
lock.lock(); HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value);
try {
final HllSketch sketch = sketchCache.get(buf).get(position);
HllSketchBuildAggregator.updateSketch(sketch, value);
}
finally {
lock.unlock();
}
} }
/**
* This method uses locks because it can be used during indexing,
* and Druid can call aggregate() and get() concurrently
* See https://github.com/druid-io/druid/pull/3956
*/
@Override @Override
public Object get(final ByteBuffer buf, final int position) public Object get(final ByteBuffer buf, final int position)
{ {
final Lock lock = stripedLock.getAt(lockIndex(position)).readLock(); return helper.get(buf, position);
lock.lock();
try {
return sketchCache.get(buf).get(position).copy();
}
finally {
lock.unlock();
}
} }
@Override @Override
public void close() public void close()
{ {
memCache.clear(); helper.clear();
sketchCache.clear();
} }
@Override @Override
@ -159,11 +87,6 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("Not implemented"); throw new UnsupportedOperationException("Not implemented");
} }
private WritableMemory getMemory(final ByteBuffer buf)
{
return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN));
}
/** /**
* In very rare cases sketches can exceed given memory, request on-heap memory and move there. * In very rare cases sketches can exceed given memory, request on-heap memory and move there.
* We need to identify such sketches and reuse the same objects as opposed to wrapping new memory regions. * We need to identify such sketches and reuse the same objects as opposed to wrapping new memory regions.
@ -171,44 +94,7 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
@Override @Override
public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf) public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf)
{ {
HllSketch sketch = sketchCache.get(oldBuf).get(oldPosition); helper.relocate(oldPosition, newPosition, oldBuf, newBuf);
final WritableMemory oldMem = getMemory(oldBuf).writableRegion(oldPosition, size);
if (sketch.isSameResource(oldMem)) { // sketch has not moved
final WritableMemory newMem = getMemory(newBuf).writableRegion(newPosition, size);
sketch = HllSketch.writableWrap(newMem);
}
putSketchIntoCache(newBuf, newPosition, sketch);
}
private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch)
{
final Int2ObjectMap<HllSketch> map = sketchCache.computeIfAbsent(buf, b -> new Int2ObjectOpenHashMap<>());
map.put(position, sketch);
}
/**
* compute lock index to avoid boxing in Striped.get() call
*
* @param position
*
* @return index
*/
static int lockIndex(final int position)
{
return smear(position) % NUM_STRIPES;
}
/**
* see https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548
*
* @param hashCode
*
* @return smeared hashCode
*/
private static int smear(int hashCode)
{
hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
} }
@Override @Override
@ -218,6 +104,6 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
// lgK should be inspected because different execution paths exist in HllSketch.update() that is called from // lgK should be inspected because different execution paths exist in HllSketch.update() that is called from
// @CalledFromHotLoop-annotated aggregate() depending on the lgK. // @CalledFromHotLoop-annotated aggregate() depending on the lgK.
// See https://github.com/apache/druid/pull/6893#discussion_r250726028 // See https://github.com/apache/druid/pull/6893#discussion_r250726028
inspector.visit("lgK", lgK); inspector.visit("lgK", helper.getLgK());
} }
} }

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.memory.WritableMemory;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;
public class HllSketchBuildBufferAggregatorHelper
{
private final int lgK;
private final int size;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<HllSketch>> sketchCache = new IdentityHashMap<>();
/**
* Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image.
* {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The
* "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link org.apache.datasketches.hll.Union} objects.
*/
private final byte[] emptySketch;
public HllSketchBuildBufferAggregatorHelper(final int lgK, final TgtHllType tgtHllType, final int size)
{
this.lgK = lgK;
this.size = size;
this.emptySketch = new byte[size];
//noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction)
new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch));
}
/**
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and
* {@link org.apache.druid.query.aggregation.VectorAggregator#init}.
*/
public void init(final ByteBuffer buf, final int position)
{
// Copy prebuilt empty sketch object.
final int oldPosition = buf.position();
try {
buf.position(position);
buf.put(emptySketch);
}
finally {
buf.position(oldPosition);
}
// Add an HllSketch for this chunk to our sketchCache.
final WritableMemory mem = getMemory(buf).writableRegion(position, size);
putSketchIntoCache(buf, position, HllSketch.writableWrap(mem));
}
/**
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and
* {@link org.apache.druid.query.aggregation.VectorAggregator#get}.
*/
public Object get(ByteBuffer buf, int position)
{
return sketchCache.get(buf).get(position).copy();
}
/**
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#relocate} and
* {@link org.apache.druid.query.aggregation.VectorAggregator#relocate}.
*/
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuf, ByteBuffer newBuf)
{
HllSketch sketch = sketchCache.get(oldBuf).get(oldPosition);
final WritableMemory oldMem = getMemory(oldBuf).writableRegion(oldPosition, size);
if (sketch.isSameResource(oldMem)) { // sketch has not moved
final WritableMemory newMem = getMemory(newBuf).writableRegion(newPosition, size);
sketch = HllSketch.writableWrap(newMem);
}
putSketchIntoCache(newBuf, newPosition, sketch);
}
/**
* Retrieves the sketch at a particular position.
*/
public HllSketch getSketchAtPosition(final ByteBuffer buf, final int position)
{
return sketchCache.get(buf).get(position);
}
/**
* Clean up resources used by this helper.
*/
public void clear()
{
memCache.clear();
sketchCache.clear();
}
public int getLgK()
{
return lgK;
}
private WritableMemory getMemory(final ByteBuffer buf)
{
return memCache.computeIfAbsent(buf, b -> WritableMemory.wrap(b, ByteOrder.LITTLE_ENDIAN));
}
private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch)
{
final Int2ObjectMap<HllSketch> map = sketchCache.computeIfAbsent(buf, b -> new Int2ObjectOpenHashMap<>());
map.put(position, sketch);
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory;
import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.function.Supplier;
public class HllSketchBuildVectorAggregator implements VectorAggregator
{
private final HllSketchBuildBufferAggregatorHelper helper;
private final Supplier<Object[]> objectSupplier;
HllSketchBuildVectorAggregator(
final VectorColumnSelectorFactory columnSelectorFactory,
final String column,
final int lgK,
final TgtHllType tgtHllType,
final int size
)
{
this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size);
this.objectSupplier =
ColumnProcessors.makeVectorProcessor(
column,
ToObjectVectorColumnProcessorFactory.INSTANCE,
columnSelectorFactory
);
}
@Override
public void init(final ByteBuffer buf, final int position)
{
helper.init(buf, position);
}
@Override
public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
{
final Object[] vector = objectSupplier.get();
for (int i = startRow; i < endRow; i++) {
final Object value = vector[i];
if (value != null) {
HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value);
}
}
}
@Override
public void aggregate(
final ByteBuffer buf,
final int numRows,
final int[] positions,
@Nullable final int[] rows,
final int positionOffset
)
{
final Object[] vector = objectSupplier.get();
for (int i = 0; i < numRows; i++) {
final Object o = vector[rows != null ? rows[i] : i];
if (o != null) {
final int position = positions[i] + positionOffset;
HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), o);
}
}
}
@Override
public Object get(final ByteBuffer buf, final int position)
{
return helper.get(buf, position);
}
/**
* In very rare cases sketches can exceed given memory, request on-heap memory and move there.
* We need to identify such sketches and reuse the same objects as opposed to wrapping new memory regions.
*/
@Override
public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf)
{
helper.relocate(oldPosition, newPosition, oldBuf, newBuf);
}
@Override
public void close()
{
helper.clear();
}
}

View File

@ -29,8 +29,11 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -102,6 +105,24 @@ public class HllSketchMergeAggregatorFactory extends HllSketchAggregatorFactory
); );
} }
@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}
@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
{
return new HllSketchMergeVectorAggregator(
selectorFactory,
getFieldName(),
getLgK(),
TgtHllType.valueOf(getTgtHllType()),
getMaxIntermediateSize()
);
}
@Override @Override
public int getMaxIntermediateSize() public int getMaxIntermediateSize()
{ {

View File

@ -19,7 +19,6 @@
package org.apache.druid.query.aggregation.datasketches.hll; package org.apache.druid.query.aggregation.datasketches.hll;
import com.google.common.util.concurrent.Striped;
import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.hll.Union; import org.apache.datasketches.hll.Union;
@ -30,8 +29,6 @@ import org.apache.druid.segment.ColumnValueSelector;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
/** /**
* This aggregator merges existing sketches. * This aggregator merges existing sketches.
@ -39,24 +36,8 @@ import java.util.concurrent.locks.ReadWriteLock;
*/ */
public class HllSketchMergeBufferAggregator implements BufferAggregator public class HllSketchMergeBufferAggregator implements BufferAggregator
{ {
/**
* for locking per buffer position (power of 2 to make index computation faster)
*/
private static final int NUM_STRIPES = 64;
private final ColumnValueSelector<HllSketch> selector; private final ColumnValueSelector<HllSketch> selector;
private final int lgK; private final HllSketchMergeBufferAggregatorHelper helper;
private final TgtHllType tgtHllType;
private final int size;
private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
/**
* Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image.
* {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The
* "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
*/
private final byte[] emptyUnion;
public HllSketchMergeBufferAggregator( public HllSketchMergeBufferAggregator(
final ColumnValueSelector<HllSketch> selector, final ColumnValueSelector<HllSketch> selector,
@ -66,39 +47,15 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
) )
{ {
this.selector = selector; this.selector = selector;
this.lgK = lgK; this.helper = new HllSketchMergeBufferAggregatorHelper(lgK, tgtHllType, size);
this.tgtHllType = tgtHllType;
this.size = size;
this.emptyUnion = new byte[size];
//noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction)
new Union(lgK, WritableMemory.wrap(emptyUnion));
} }
@Override @Override
public void init(final ByteBuffer buf, final int position) public void init(final ByteBuffer buf, final int position)
{ {
// Copy prebuilt empty union object. helper.init(buf, position);
// Not necessary to cache a Union wrapper around the initialized memory, because:
// - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get".
// - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the
// max size and therefore do not need to be potentially moved in-heap.
final int oldPosition = buf.position();
try {
buf.position(position);
buf.put(emptyUnion);
}
finally {
buf.position(oldPosition);
}
} }
/**
* This method uses locks because it can be used during indexing,
* and Druid can call aggregate() and get() concurrently
* See https://github.com/druid-io/druid/pull/3956
*/
@Override @Override
public void aggregate(final ByteBuffer buf, final int position) public void aggregate(final ByteBuffer buf, final int position)
{ {
@ -106,36 +63,18 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
if (sketch == null) { if (sketch == null) {
return; return;
} }
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).writeLock(); final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)
lock.lock(); .writableRegion(position, helper.getSize());
try {
final Union union = Union.writableWrap(mem); final Union union = Union.writableWrap(mem);
union.update(sketch); union.update(sketch);
}
finally {
lock.unlock();
}
} }
/**
* This method uses locks because it can be used during indexing,
* and Druid can call aggregate() and get() concurrently
* See https://github.com/druid-io/druid/pull/3956
*/
@Override @Override
public Object get(final ByteBuffer buf, final int position) public Object get(final ByteBuffer buf, final int position)
{ {
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size); return helper.get(buf, position);
final Lock lock = stripedLock.getAt(HllSketchBuildBufferAggregator.lockIndex(position)).readLock();
lock.lock();
try {
final Union union = Union.writableWrap(mem);
return union.getResult(tgtHllType);
}
finally {
lock.unlock();
}
} }
@Override @Override
@ -163,6 +102,6 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
// lgK should be inspected because different execution paths exist in Union.update() that is called from // lgK should be inspected because different execution paths exist in Union.update() that is called from
// @CalledFromHotLoop-annotated aggregate() depending on the lgK. // @CalledFromHotLoop-annotated aggregate() depending on the lgK.
// See https://github.com/apache/druid/pull/6893#discussion_r250726028 // See https://github.com/apache/druid/pull/6893#discussion_r250726028
inspector.visit("lgK", lgK); inspector.visit("lgK", helper.getLgK());
} }
} }

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.hll.Union;
import org.apache.datasketches.memory.WritableMemory;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
public class HllSketchMergeBufferAggregatorHelper
{
private final int lgK;
private final TgtHllType tgtHllType;
private final int size;
/**
* Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image.
* {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The
* "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
*/
private final byte[] emptyUnion;
public HllSketchMergeBufferAggregatorHelper(int lgK, TgtHllType tgtHllType, int size)
{
this.lgK = lgK;
this.tgtHllType = tgtHllType;
this.size = size;
this.emptyUnion = new byte[size];
//noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction)
new Union(lgK, WritableMemory.wrap(emptyUnion));
}
/**
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and
* {@link org.apache.druid.query.aggregation.VectorAggregator#init}.
*/
public void init(final ByteBuffer buf, final int position)
{
// Copy prebuilt empty union object.
// Not necessary to cache a Union wrapper around the initialized memory, because:
// - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get".
// - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the
// max size and therefore do not need to be potentially moved in-heap.
final int oldPosition = buf.position();
try {
buf.position(position);
buf.put(emptyUnion);
}
finally {
buf.position(oldPosition);
}
}
/**
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and
* {@link org.apache.druid.query.aggregation.VectorAggregator#get}.
*/
public Object get(ByteBuffer buf, int position)
{
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
final Union union = Union.writableWrap(mem);
return union.getResult(tgtHllType);
}
public int getLgK()
{
return lgK;
}
public int getSize()
{
return size;
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.hll.Union;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory;
import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.function.Supplier;
public class HllSketchMergeVectorAggregator implements VectorAggregator
{
private final HllSketchMergeBufferAggregatorHelper helper;
private final Supplier<Object[]> objectSupplier;
HllSketchMergeVectorAggregator(
final VectorColumnSelectorFactory columnSelectorFactory,
final String column,
final int lgK,
final TgtHllType tgtHllType,
final int size
)
{
this.helper = new HllSketchMergeBufferAggregatorHelper(lgK, tgtHllType, size);
this.objectSupplier =
ColumnProcessors.makeVectorProcessor(
column,
ToObjectVectorColumnProcessorFactory.INSTANCE,
columnSelectorFactory
);
}
@Override
public void init(final ByteBuffer buf, final int position)
{
helper.init(buf, position);
}
@Override
public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
{
final Object[] vector = objectSupplier.get();
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)
.writableRegion(position, helper.getSize());
final Union union = Union.writableWrap(mem);
for (int i = startRow; i < endRow; i++) {
union.update((HllSketch) vector[i]);
}
}
@Override
public void aggregate(
final ByteBuffer buf,
final int numRows,
final int[] positions,
@Nullable final int[] rows,
final int positionOffset
)
{
final Object[] vector = objectSupplier.get();
for (int i = 0; i < numRows; i++) {
final HllSketch o = (HllSketch) vector[rows != null ? rows[i] : i];
if (o != null) {
final int position = positions[i] + positionOffset;
final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN)
.writableRegion(position, helper.getSize());
final Union union = Union.writableWrap(mem);
union.update(o);
}
}
}
@Override
public Object get(final ByteBuffer buf, final int position)
{
return helper.get(buf, position);
}
@Override
public void close()
{
// Nothing to close.
}
}

View File

@ -31,18 +31,18 @@ import java.util.function.Supplier;
public class SketchVectorAggregator implements VectorAggregator public class SketchVectorAggregator implements VectorAggregator
{ {
private final Supplier<Object[]> toObjectProcessor;
private final SketchBufferAggregatorHelper helper; private final SketchBufferAggregatorHelper helper;
private final Supplier<Object[]> objectSupplier;
public SketchVectorAggregator( SketchVectorAggregator(
VectorColumnSelectorFactory columnSelectorFactory, final VectorColumnSelectorFactory columnSelectorFactory,
String column, final String column,
int size, final int size,
int maxIntermediateSize final int maxIntermediateSize
) )
{ {
this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize); this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize);
this.toObjectProcessor = this.objectSupplier =
ColumnProcessors.makeVectorProcessor( ColumnProcessors.makeVectorProcessor(
column, column,
ToObjectVectorColumnProcessorFactory.INSTANCE, ToObjectVectorColumnProcessorFactory.INSTANCE,
@ -60,7 +60,7 @@ public class SketchVectorAggregator implements VectorAggregator
public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
{ {
final Union union = helper.getOrCreateUnion(buf, position); final Union union = helper.getOrCreateUnion(buf, position);
final Object[] vector = toObjectProcessor.get(); final Object[] vector = objectSupplier.get();
for (int i = startRow; i < endRow; i++) { for (int i = startRow; i < endRow; i++) {
final Object o = vector[i]; final Object o = vector[i];
@ -79,7 +79,7 @@ public class SketchVectorAggregator implements VectorAggregator
final int positionOffset final int positionOffset
) )
{ {
final Object[] vector = toObjectProcessor.get(); final Object[] vector = objectSupplier.get();
for (int i = 0; i < numRows; i++) { for (int i = 0; i < numRows; i++) {
final Object o = vector[rows != null ? rows[i] : i]; final Object o = vector[rows != null ? rows[i] : i];

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregationTestHelper; import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQuery;
@ -54,23 +55,27 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
private static final boolean ROUND = true; private static final boolean ROUND = true;
private final AggregationTestHelper helper; private final AggregationTestHelper helper;
private final QueryContexts.Vectorize vectorize;
@Rule @Rule
public final TemporaryFolder tempFolder = new TemporaryFolder(); public final TemporaryFolder tempFolder = new TemporaryFolder();
public HllSketchAggregatorTest(GroupByQueryConfig config) public HllSketchAggregatorTest(GroupByQueryConfig config, String vectorize)
{ {
HllSketchModule.registerSerde(); HllSketchModule.registerSerde();
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
new HllSketchModule().getJacksonModules(), config, tempFolder); new HllSketchModule().getJacksonModules(), config, tempFolder);
this.vectorize = QueryContexts.Vectorize.fromString(vectorize);
} }
@Parameterized.Parameters(name = "{0}") @Parameterized.Parameters(name = "config = {0}, vectorize = {1}")
public static Collection<?> constructorFeeder() public static Collection<?> constructorFeeder()
{ {
final List<Object[]> constructors = new ArrayList<>(); final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
constructors.add(new Object[]{config}); for (String vectorize : new String[]{"false", "true", "force"}) {
constructors.add(new Object[]{config, vectorize});
}
} }
return constructors; return constructors;
} }
@ -224,10 +229,32 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
) )
.setPostAggregatorSpecs( .setPostAggregatorSpecs(
ImmutableList.of( ImmutableList.of(
new HllSketchToEstimatePostAggregator("estimate", new FieldAccessPostAggregator("f1", "sketch"), false), new HllSketchToEstimatePostAggregator(
new HllSketchToEstimateWithBoundsPostAggregator("estimateWithBounds", new FieldAccessPostAggregator("f1", "sketch"), 2), "estimate",
new HllSketchToStringPostAggregator("summary", new FieldAccessPostAggregator("f1", "sketch")), new FieldAccessPostAggregator("f1", "sketch"),
new HllSketchUnionPostAggregator("union", ImmutableList.of(new FieldAccessPostAggregator("f1", "sketch"), new FieldAccessPostAggregator("f2", "sketch")), null, null) false
),
new HllSketchToEstimateWithBoundsPostAggregator(
"estimateWithBounds",
new FieldAccessPostAggregator(
"f1",
"sketch"
),
2
),
new HllSketchToStringPostAggregator(
"summary",
new FieldAccessPostAggregator("f1", "sketch")
),
new HllSketchUnionPostAggregator(
"union",
ImmutableList.of(new FieldAccessPostAggregator(
"f1",
"sketch"
), new FieldAccessPostAggregator("f2", "sketch")),
null,
null
)
) )
) )
.build() .build()
@ -320,7 +347,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
); );
} }
private static String buildGroupByQueryJson( private String buildGroupByQueryJson(
String aggregationType, String aggregationType,
String aggregationFieldName, String aggregationFieldName,
boolean aggregationRound boolean aggregationRound
@ -338,6 +365,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
.put("dimensions", Collections.emptyList()) .put("dimensions", Collections.emptyList())
.put("aggregations", Collections.singletonList(aggregation)) .put("aggregations", Collections.singletonList(aggregation))
.put("intervals", Collections.singletonList("2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z")) .put("intervals", Collections.singletonList("2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z"))
.put("context", ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize.toString()))
.build(); .build();
return toJson(object); return toJson(object);
} }

View File

@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids; import org.apache.druid.query.Druids;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -84,34 +85,60 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@RunWith(Parameterized.class)
public class HllSketchSqlAggregatorTest extends CalciteTestBase public class HllSketchSqlAggregatorTest extends CalciteTestBase
{ {
private static final String DATA_SOURCE = "foo"; private static final String DATA_SOURCE = "foo";
private static final boolean ROUND = true; private static final boolean ROUND = true;
private static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
PlannerContext.CTX_SQL_QUERY_ID, "dummy"
);
private static QueryRunnerFactoryConglomerate conglomerate; private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser; private static Closer resourceCloser;
private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT; private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder(); public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule @Rule
public QueryLogHook queryLogHook = QueryLogHook.create(TestHelper.JSON_MAPPER); public QueryLogHook queryLogHook = QueryLogHook.create(TestHelper.JSON_MAPPER);
private final Map<String, Object> queryContext;
private SpecificSegmentsQuerySegmentWalker walker; private SpecificSegmentsQuerySegmentWalker walker;
private SqlLifecycleFactory sqlLifecycleFactory; private SqlLifecycleFactory sqlLifecycleFactory;
public HllSketchSqlAggregatorTest(final String vectorize)
{
this.queryContext = ImmutableMap.of(
PlannerContext.CTX_SQL_QUERY_ID, "dummy",
QueryContexts.VECTORIZE_KEY, vectorize,
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
);
}
@Parameterized.Parameters(name = "vectorize = {0}")
public static Collection<?> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (String vectorize : new String[]{"false", "true", "force"}) {
constructors.add(new Object[]{vectorize});
}
return constructors;
}
@BeforeClass @BeforeClass
public static void setUpClass() public static void setUpClass()
{ {
@ -207,6 +234,9 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
@Test @Test
public void testApproxCountDistinctHllSketch() throws Exception public void testApproxCountDistinctHllSketch() throws Exception
{ {
// Can't vectorize due to CONCAT expression.
cannotVectorize();
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n" final String sql = "SELECT\n"
@ -222,7 +252,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
// Verify results // Verify results
final List<Object[]> results = sqlLifecycle.runSimple( final List<Object[]> results = sqlLifecycle.runSimple(
sql, sql,
QUERY_CONTEXT_DEFAULT, queryContext,
DEFAULT_PARAMETERS, DEFAULT_PARAMETERS,
authenticationResult authenticationResult
).toList(); ).toList();
@ -317,8 +347,9 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, ROUND) new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, ROUND)
) )
) )
.context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) .context(queryContext)
.build(), .build()
.withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
); );
} }
@ -327,6 +358,9 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
@Test @Test
public void testAvgDailyCountDistinctHllSketch() throws Exception public void testAvgDailyCountDistinctHllSketch() throws Exception
{ {
// Can't vectorize due to outer query, which runs on an inline datasource.
cannotVectorize();
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n" final String sql = "SELECT\n"
@ -340,7 +374,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
// Verify results // Verify results
final List<Object[]> results = sqlLifecycle.runSimple( final List<Object[]> results = sqlLifecycle.runSimple(
sql, sql,
QUERY_CONTEXT_DEFAULT, queryContext,
DEFAULT_PARAMETERS, DEFAULT_PARAMETERS,
authenticationResult authenticationResult
).toList(); ).toList();
@ -379,11 +413,14 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
new FinalizingFieldAccessPostAggregator("a0", "a0:a") new FinalizingFieldAccessPostAggregator("a0", "a0:a")
) )
) )
.context(BaseCalciteQueryTest.getTimeseriesContextWithFloorTime( .context(queryContext)
ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"),
"d0"
))
.build() .build()
.withOverriddenContext(
BaseCalciteQueryTest.getTimeseriesContextWithFloorTime(
ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"),
"d0"
)
)
) )
) )
.setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
@ -414,7 +451,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
) )
) )
) )
.setContext(QUERY_CONTEXT_DEFAULT) .setContext(queryContext)
.build(); .build();
Query actual = Iterables.getOnlyElement(queryLogHook.getRecordedQueries()); Query actual = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
@ -437,7 +474,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
// Verify results // Verify results
final List<Object[]> results = final List<Object[]> results =
sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); sqlLifecycle.runSimple(sql, queryContext, DEFAULT_PARAMETERS, authenticationResult).toList();
final int expected = NullHandling.replaceWithDefault() ? 1 : 2; final int expected = NullHandling.replaceWithDefault() ? 1 : 2;
Assert.assertEquals(expected, results.size()); Assert.assertEquals(expected, results.size());
} }
@ -466,7 +503,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
// Verify results // Verify results
final List<Object[]> results = sqlLifecycle.runSimple( final List<Object[]> results = sqlLifecycle.runSimple(
sql, sql,
QUERY_CONTEXT_DEFAULT, queryContext,
DEFAULT_PARAMETERS, DEFAULT_PARAMETERS,
authenticationResult authenticationResult
).toList(); ).toList();
@ -598,11 +635,9 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
new HllSketchToEstimatePostAggregator("p23", new FieldAccessPostAggregator("p22", "a0"), true) new HllSketchToEstimatePostAggregator("p23", new FieldAccessPostAggregator("p22", "a0"), true)
) )
) )
.context(ImmutableMap.of( .context(queryContext)
"skipEmptyBuckets", true, .build()
PlannerContext.CTX_SQL_QUERY_ID, "dummy" .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true));
))
.build();
// Verify query // Verify query
Assert.assertEquals(expectedQuery, actualQuery); Assert.assertEquals(expectedQuery, actualQuery);
@ -619,7 +654,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
// Verify results // Verify results
final List<Object[]> results = sqlLifecycle.runSimple( final List<Object[]> results = sqlLifecycle.runSimple(
sql2, sql2,
QUERY_CONTEXT_DEFAULT, queryContext,
DEFAULT_PARAMETERS, DEFAULT_PARAMETERS,
authenticationResult authenticationResult
).toList(); ).toList();
@ -670,13 +705,19 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
new HllSketchToStringPostAggregator("s3", new FieldAccessPostAggregator("s2", "p0")) new HllSketchToStringPostAggregator("s3", new FieldAccessPostAggregator("s2", "p0"))
) )
) )
.context(ImmutableMap.of( .context(queryContext)
"skipEmptyBuckets", true, .build()
PlannerContext.CTX_SQL_QUERY_ID, "dummy" .withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true));
))
.build();
// Verify query // Verify query
Assert.assertEquals(expectedQuery, actualQuery); Assert.assertEquals(expectedQuery, actualQuery);
} }
private void cannotVectorize()
{
if (QueryContexts.Vectorize.fromString((String) queryContext.get(QueryContexts.VECTORIZE_KEY))
== QueryContexts.Vectorize.FORCE) {
expectedException.expectMessage("Cannot vectorize");
}
}
} }

View File

@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids; import org.apache.druid.query.Druids;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -81,14 +82,20 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@RunWith(Parameterized.class)
public class ThetaSketchSqlAggregatorTest extends CalciteTestBase public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
{ {
private static final String DATA_SOURCE = "foo"; private static final String DATA_SOURCE = "foo";
@ -96,9 +103,6 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
private static QueryRunnerFactoryConglomerate conglomerate; private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser; private static Closer resourceCloser;
private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT; private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
private static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
PlannerContext.CTX_SQL_QUERY_ID, "dummy"
);
@BeforeClass @BeforeClass
public static void setUpClass() public static void setUpClass()
@ -113,15 +117,38 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
resourceCloser.close(); resourceCloser.close();
} }
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder(); public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule @Rule
public QueryLogHook queryLogHook = QueryLogHook.create(); public QueryLogHook queryLogHook = QueryLogHook.create();
private final Map<String, Object> queryContext;
private SpecificSegmentsQuerySegmentWalker walker; private SpecificSegmentsQuerySegmentWalker walker;
private SqlLifecycleFactory sqlLifecycleFactory; private SqlLifecycleFactory sqlLifecycleFactory;
public ThetaSketchSqlAggregatorTest(final String vectorize)
{
this.queryContext = ImmutableMap.of(
PlannerContext.CTX_SQL_QUERY_ID, "dummy",
QueryContexts.VECTORIZE_KEY, vectorize,
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
);
}
@Parameterized.Parameters(name = "vectorize = {0}")
public static Collection<?> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (String vectorize : new String[]{"false", "true", "force"}) {
constructors.add(new Object[]{vectorize});
}
return constructors;
}
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
@ -206,21 +233,30 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
@Test @Test
public void testApproxCountDistinctThetaSketch() throws Exception public void testApproxCountDistinctThetaSketch() throws Exception
{ {
// Cannot vectorize due to SUBSTRING.
cannotVectorize();
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n" final String sql = "SELECT\n"
+ " SUM(cnt),\n" + " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_DS_THETA(dim2),\n" // uppercase + " APPROX_COUNT_DISTINCT_DS_THETA(dim2),\n"
+ " APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim2 <> ''),\n" // lowercase; also, filtered // uppercase
+ " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1)),\n" // on extractionFn + " APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim2 <> ''),\n"
+ " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1) || 'x'),\n" // on expression // lowercase; also, filtered
+ " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1, 32768),\n" // on native theta sketch column + " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1)),\n"
+ " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1)\n" // on native theta sketch column // on extractionFn
+ " APPROX_COUNT_DISTINCT_DS_THETA(SUBSTRING(dim2, 1, 1) || 'x'),\n"
// on expression
+ " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1, 32768),\n"
// on native theta sketch column
+ " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1)\n"
// on native theta sketch column
+ "FROM druid.foo"; + "FROM druid.foo";
// Verify results // Verify results
final List<Object[]> results = sqlLifecycle.runSimple( final List<Object[]> results = sqlLifecycle.runSimple(
sql, sql,
QUERY_CONTEXT_DEFAULT, queryContext,
DEFAULT_PARAMETERS, DEFAULT_PARAMETERS,
authenticationResult authenticationResult
).toList(); ).toList();
@ -319,8 +355,9 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
new SketchMergeAggregatorFactory("a6", "thetasketch_dim1", null, null, null, null) new SketchMergeAggregatorFactory("a6", "thetasketch_dim1", null, null, null, null)
) )
) )
.context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) .context(queryContext)
.build(), .build()
.withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true)),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
); );
} }
@ -328,6 +365,9 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
@Test @Test
public void testAvgDailyCountDistinctThetaSketch() throws Exception public void testAvgDailyCountDistinctThetaSketch() throws Exception
{ {
// Can't vectorize due to outer query (it operates on an inlined data source, which cannot be vectorized).
cannotVectorize();
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n" final String sql = "SELECT\n"
@ -337,7 +377,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
// Verify results // Verify results
final List<Object[]> results = sqlLifecycle.runSimple( final List<Object[]> results = sqlLifecycle.runSimple(
sql, sql,
QUERY_CONTEXT_DEFAULT, queryContext,
DEFAULT_PARAMETERS, DEFAULT_PARAMETERS,
authenticationResult authenticationResult
).toList(); ).toList();
@ -358,7 +398,11 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of( .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(
Filtration.eternity() Filtration.eternity()
))) )))
.granularity(new PeriodGranularity(Period.days(1), null, DateTimeZone.UTC)) .granularity(new PeriodGranularity(
Period.days(1),
null,
DateTimeZone.UTC
))
.aggregators( .aggregators(
Collections.singletonList( Collections.singletonList(
new SketchMergeAggregatorFactory( new SketchMergeAggregatorFactory(
@ -373,14 +417,25 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
) )
.postAggregators( .postAggregators(
ImmutableList.of( ImmutableList.of(
new FinalizingFieldAccessPostAggregator("a0", "a0:a") new FinalizingFieldAccessPostAggregator(
"a0",
"a0:a"
)
) )
) )
.context(BaseCalciteQueryTest.getTimeseriesContextWithFloorTime( .context(queryContext)
ImmutableMap.of("skipEmptyBuckets", true, "sqlQueryId", "dummy"),
"d0"
))
.build() .build()
.withOverriddenContext(
BaseCalciteQueryTest.getTimeseriesContextWithFloorTime(
ImmutableMap.of(
"skipEmptyBuckets",
true,
"sqlQueryId",
"dummy"
),
"d0"
)
)
) )
) )
.setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) .setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
@ -388,8 +443,8 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
.setAggregatorSpecs( .setAggregatorSpecs(
NullHandling.replaceWithDefault() NullHandling.replaceWithDefault()
? Arrays.asList( ? Arrays.asList(
new LongSumAggregatorFactory("_a0:sum", "a0"), new LongSumAggregatorFactory("_a0:sum", "a0"),
new CountAggregatorFactory("_a0:count") new CountAggregatorFactory("_a0:count")
) )
: Arrays.asList( : Arrays.asList(
new LongSumAggregatorFactory("_a0:sum", "a0"), new LongSumAggregatorFactory("_a0:sum", "a0"),
@ -411,7 +466,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
) )
) )
) )
.setContext(QUERY_CONTEXT_DEFAULT) .setContext(queryContext)
.build(); .build();
Query actual = Iterables.getOnlyElement(queryLogHook.getRecordedQueries()); Query actual = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
@ -439,7 +494,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
// Verify results // Verify results
final List<Object[]> results = sqlLifecycle.runSimple( final List<Object[]> results = sqlLifecycle.runSimple(
sql, sql,
QUERY_CONTEXT_DEFAULT, queryContext,
DEFAULT_PARAMETERS, DEFAULT_PARAMETERS,
authenticationResult authenticationResult
).toList(); ).toList();
@ -598,8 +653,9 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
null null
) )
) )
.context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) .context(queryContext)
.build(); .build()
.withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true));
// Verify query // Verify query
@ -617,7 +673,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
// Verify results // Verify results
final List<Object[]> results = sqlLifecycle.runSimple( final List<Object[]> results = sqlLifecycle.runSimple(
sql2, sql2,
QUERY_CONTEXT_DEFAULT, queryContext,
DEFAULT_PARAMETERS, DEFAULT_PARAMETERS,
authenticationResult authenticationResult
).toList(); ).toList();
@ -664,11 +720,19 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
null null
) )
) )
.context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) .context(queryContext)
.build(); .build()
.withOverriddenContext(ImmutableMap.of("skipEmptyBuckets", true));
// Verify query // Verify query
Assert.assertEquals(expectedQuery, actualQuery); Assert.assertEquals(expectedQuery, actualQuery);
} }
private void cannotVectorize()
{
if (QueryContexts.Vectorize.fromString((String) queryContext.get(QueryContexts.VECTORIZE_KEY))
== QueryContexts.Vectorize.FORCE) {
expectedException.expectMessage("Cannot vectorize");
}
}
} }