Vectorize the DataSketches quantiles aggregator. (#11183)

* Vectorize the DataSketches quantiles aggregator.

Also removes synchronization for the BufferAggregator and VectorAggregator
implementations, since it is not necessary (similar to #11115).

Extends DoublesSketchAggregatorTest and DoublesSketchSqlAggregatorTest
to run all test cases in vectorized mode.

* Style fix.
This commit is contained in:
Gian Merlino 2021-05-02 16:14:21 -07:00 committed by GitHub
parent 046069f35a
commit 809e001939
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1050 additions and 622 deletions

View File

@ -32,11 +32,22 @@ import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.VectorColumnProcessorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.util.Collections;
@ -63,7 +74,8 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
public DoublesSketchAggregatorFactory(
@JsonProperty("name") final String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("k") final Integer k)
@JsonProperty("k") final Integer k
)
{
this(name, fieldName, k, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID);
}
@ -106,7 +118,7 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
{
if (metricFactory.getColumnCapabilities(fieldName) != null
&& ValueType.isNumeric(metricFactory.getColumnCapabilities(fieldName).getType())) {
final ColumnValueSelector<Double> selector = metricFactory.makeColumnValueSelector(fieldName);
final BaseDoubleColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
if (selector instanceof NilColumnValueSelector) {
return new NoopDoublesSketchBufferAggregator();
}
@ -119,6 +131,65 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
return new DoublesSketchMergeBufferAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}
@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
{
return ColumnProcessors.makeVectorProcessor(
fieldName,
new VectorColumnProcessorFactory<VectorAggregator>()
{
@Override
public VectorAggregator makeSingleValueDimensionProcessor(
ColumnCapabilities capabilities,
SingleValueDimensionVectorSelector selector
)
{
return new NoopDoublesSketchBufferAggregator();
}
@Override
public VectorAggregator makeMultiValueDimensionProcessor(
ColumnCapabilities capabilities,
MultiValueDimensionVectorSelector selector
)
{
return new NoopDoublesSketchBufferAggregator();
}
@Override
public VectorAggregator makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
{
return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}
@Override
public VectorAggregator makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
{
return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}
@Override
public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
{
return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}
@Override
public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return new DoublesSketchMergeVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}
},
selectorFactory
);
}
@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}
@Override
public Object deserialize(final Object object)
{
@ -217,8 +288,9 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
new DoublesSketchAggregatorFactory(
fieldName,
fieldName,
k)
);
k
)
);
}
@Override
@ -306,10 +378,10 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
public String toString()
{
return getClass().getSimpleName() + "{"
+ "name=" + name
+ ", fieldName=" + fieldName
+ ", k=" + k
+ "}";
+ "name=" + name
+ ", fieldName=" + fieldName
+ ", k=" + k
+ "}";
}
}

View File

@ -19,60 +19,52 @@
package org.apache.druid.query.aggregation.datasketches.quantiles;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;
public class DoublesSketchBuildBufferAggregator implements BufferAggregator
{
private final ColumnValueSelector<Double> selector;
private final int size;
private final int maxIntermediateSize;
private final BaseDoubleColumnValueSelector selector;
private final DoublesSketchBuildBufferAggregatorHelper helper;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<UpdateDoublesSketch>> sketches = new IdentityHashMap<>();
public DoublesSketchBuildBufferAggregator(final ColumnValueSelector<Double> valueSelector, final int size,
final int maxIntermediateSize)
public DoublesSketchBuildBufferAggregator(
final BaseDoubleColumnValueSelector valueSelector,
final int size,
final int maxIntermediateSize
)
{
this.selector = valueSelector;
this.size = size;
this.maxIntermediateSize = maxIntermediateSize;
this.helper = new DoublesSketchBuildBufferAggregatorHelper(size, maxIntermediateSize);
}
@Override
public synchronized void init(final ByteBuffer buffer, final int position)
public void init(ByteBuffer buf, int position)
{
final WritableMemory mem = getMemory(buffer);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final UpdateDoublesSketch sketch = DoublesSketch.builder().setK(size).build(region);
putSketch(buffer, position, sketch);
helper.init(buf, position);
}
@Override
public synchronized void aggregate(final ByteBuffer buffer, final int position)
public void aggregate(final ByteBuffer buffer, final int position)
{
if (selector.isNull()) {
return;
}
final UpdateDoublesSketch sketch = sketches.get(buffer).get(position);
final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buffer, position);
sketch.update(selector.getDouble());
}
@Nullable
@Override
public synchronized Object get(final ByteBuffer buffer, final int position)
public Object get(ByteBuffer buf, int position)
{
return sketches.get(buffer).get(position).compact();
return helper.get(buf, position);
}
@Override
@ -88,42 +80,17 @@ public class DoublesSketchBuildBufferAggregator implements BufferAggregator
}
@Override
public synchronized void close()
public void close()
{
sketches.clear();
memCache.clear();
helper.clear();
}
// A small number of sketches may run out of the given memory, request more memory on heap and move there.
// In that case we need to reuse the object from the cache as opposed to wrapping the new buffer.
@Override
public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
UpdateDoublesSketch sketch = sketches.get(oldBuffer).get(oldPosition);
final WritableMemory oldRegion = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
if (sketch.isSameResource(oldRegion)) { // sketch was not relocated on heap
final WritableMemory newRegion = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
sketch = UpdateDoublesSketch.wrap(newRegion);
}
putSketch(newBuffer, newPosition, sketch);
final Int2ObjectMap<UpdateDoublesSketch> map = sketches.get(oldBuffer);
map.remove(oldPosition);
if (map.isEmpty()) {
sketches.remove(oldBuffer);
memCache.remove(oldBuffer);
}
}
private WritableMemory getMemory(final ByteBuffer buffer)
{
return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
}
private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch)
{
Int2ObjectMap<UpdateDoublesSketch> map = sketches.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
map.put(position, sketch);
helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
}
@Override
@ -131,5 +98,4 @@ public class DoublesSketchBuildBufferAggregator implements BufferAggregator
{
inspector.visit("selector", selector);
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.CompactDoublesSketch;
import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;
public class DoublesSketchBuildBufferAggregatorHelper
{
private final int size;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<UpdateDoublesSketch>> sketches = new IdentityHashMap<>();
public DoublesSketchBuildBufferAggregatorHelper(final int size, final int maxIntermediateSize)
{
this.size = size;
this.maxIntermediateSize = maxIntermediateSize;
}
public void init(final ByteBuffer buffer, final int position)
{
final WritableMemory mem = getMemory(buffer);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final UpdateDoublesSketch sketch = DoublesSketch.builder().setK(size).build(region);
putSketch(buffer, position, sketch);
}
public CompactDoublesSketch get(final ByteBuffer buffer, final int position)
{
return sketches.get(buffer).get(position).compact();
}
// A small number of sketches may run out of the given memory, request more memory on heap and move there.
// In that case we need to reuse the object from the cache as opposed to wrapping the new buffer.
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
UpdateDoublesSketch sketch = sketches.get(oldBuffer).get(oldPosition);
final WritableMemory oldRegion = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
if (sketch.isSameResource(oldRegion)) { // sketch was not relocated on heap
final WritableMemory newRegion = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
sketch = UpdateDoublesSketch.wrap(newRegion);
}
putSketch(newBuffer, newPosition, sketch);
final Int2ObjectMap<UpdateDoublesSketch> map = sketches.get(oldBuffer);
map.remove(oldPosition);
if (map.isEmpty()) {
sketches.remove(oldBuffer);
memCache.remove(oldBuffer);
}
}
public void clear()
{
sketches.clear();
memCache.clear();
}
/**
* Retrieves the sketch at a particular position.
*/
public UpdateDoublesSketch getSketchAtPosition(final ByteBuffer buf, final int position)
{
return sketches.get(buf).get(position);
}
private WritableMemory getMemory(final ByteBuffer buffer)
{
return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
}
private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch)
{
Int2ObjectMap<UpdateDoublesSketch> map = sketches.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
map.put(position, sketch);
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class DoublesSketchBuildVectorAggregator implements VectorAggregator
{
private final VectorValueSelector selector;
private final DoublesSketchBuildBufferAggregatorHelper helper;
DoublesSketchBuildVectorAggregator(
final VectorValueSelector selector,
final int size,
final int maxIntermediateSize
)
{
this.selector = selector;
this.helper = new DoublesSketchBuildBufferAggregatorHelper(size, maxIntermediateSize);
}
@Override
public void init(final ByteBuffer buf, final int position)
{
helper.init(buf, position);
}
@Override
public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
{
final double[] doubles = selector.getDoubleVector();
final boolean[] nulls = selector.getNullVector();
final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buf, position);
for (int i = startRow; i < endRow; i++) {
if (nulls == null || !nulls[i]) {
sketch.update(doubles[i]);
}
}
}
@Override
public void aggregate(
final ByteBuffer buf,
final int numRows,
final int[] positions,
@Nullable final int[] rows,
final int positionOffset
)
{
final double[] doubles = selector.getDoubleVector();
final boolean[] nulls = selector.getNullVector();
for (int i = 0; i < numRows; i++) {
final int idx = rows != null ? rows[i] : i;
if (nulls == null || !nulls[idx]) {
final int position = positions[i] + positionOffset;
helper.getSketchAtPosition(buf, position).update(doubles[idx]);
}
}
}
@Override
public Object get(final ByteBuffer buf, final int position)
{
return helper.get(buf, position);
}
@Override
public void relocate(final int oldPosition, final int newPosition, final ByteBuffer oldBuf, final ByteBuffer newBuf)
{
helper.relocate(oldPosition, newPosition, oldBuf, newBuf);
}
@Override
public void close()
{
helper.clear();
}
}

View File

@ -19,57 +19,44 @@
package org.apache.druid.query.aggregation.datasketches.quantiles;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.DoublesUnion;
import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;
public class DoublesSketchMergeBufferAggregator implements BufferAggregator
{
private final ColumnValueSelector selector;
private final int k;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DoublesUnion>> unions = new IdentityHashMap<>();
private final ColumnValueSelector<DoublesSketch> selector;
private final DoublesSketchMergeBufferAggregatorHelper helper;
public DoublesSketchMergeBufferAggregator(
final ColumnValueSelector selector,
final ColumnValueSelector<DoublesSketch> selector,
final int k,
final int maxIntermediateSize)
final int maxIntermediateSize
)
{
this.selector = selector;
this.k = k;
this.maxIntermediateSize = maxIntermediateSize;
this.helper = new DoublesSketchMergeBufferAggregatorHelper(k, maxIntermediateSize);
}
@Override
public synchronized void init(final ByteBuffer buffer, final int position)
public void init(final ByteBuffer buffer, final int position)
{
final WritableMemory mem = getMemory(buffer);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final DoublesUnion union = DoublesUnion.builder().setMaxK(k).build(region);
putUnion(buffer, position, union);
helper.init(buffer, position);
}
@Override
public synchronized void aggregate(final ByteBuffer buffer, final int position)
public void aggregate(final ByteBuffer buffer, final int position)
{
final DoublesUnion union = unions.get(buffer).get(position);
DoublesSketchMergeAggregator.updateUnion(selector, union);
DoublesSketchMergeAggregator.updateUnion(selector, helper.getSketchAtPosition(buffer, position));
}
@Override
public synchronized Object get(final ByteBuffer buffer, final int position)
public Object get(final ByteBuffer buffer, final int position)
{
return unions.get(buffer).get(position).getResult();
return helper.getSketchAtPosition(buffer, position).getResult();
}
@Override
@ -87,8 +74,7 @@ public class DoublesSketchMergeBufferAggregator implements BufferAggregator
@Override
public synchronized void close()
{
unions.clear();
memCache.clear();
helper.clear();
}
// A small number of sketches may run out of the given memory, request more memory on heap and move there.
@ -96,31 +82,7 @@ public class DoublesSketchMergeBufferAggregator implements BufferAggregator
@Override
public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
DoublesUnion union = unions.get(oldBuffer).get(oldPosition);
final WritableMemory oldMem = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
if (union.isSameResource(oldMem)) { // union was not relocated on heap
final WritableMemory newMem = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
union = DoublesUnion.wrap(newMem);
}
putUnion(newBuffer, newPosition, union);
Int2ObjectMap<DoublesUnion> map = unions.get(oldBuffer);
map.remove(oldPosition);
if (map.isEmpty()) {
unions.remove(oldBuffer);
memCache.remove(oldBuffer);
}
}
private WritableMemory getMemory(final ByteBuffer buffer)
{
return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
}
private void putUnion(final ByteBuffer buffer, final int position, final DoublesUnion union)
{
Int2ObjectMap<DoublesUnion> map = unions.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
map.put(position, union);
helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
}
@Override
@ -128,5 +90,4 @@ public class DoublesSketchMergeBufferAggregator implements BufferAggregator
{
inspector.visit("selector", selector);
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.DoublesUnion;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;
public class DoublesSketchMergeBufferAggregatorHelper
{
private final int k;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DoublesUnion>> unions = new IdentityHashMap<>();
public DoublesSketchMergeBufferAggregatorHelper(
final int k,
final int maxIntermediateSize
)
{
this.k = k;
this.maxIntermediateSize = maxIntermediateSize;
}
public void init(final ByteBuffer buffer, final int position)
{
final WritableMemory mem = getMemory(buffer);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final DoublesUnion union = DoublesUnion.builder().setMaxK(k).build(region);
putUnion(buffer, position, union);
}
public Object get(final ByteBuffer buffer, final int position)
{
return unions.get(buffer).get(position).getResult();
}
public void clear()
{
unions.clear();
memCache.clear();
}
// A small number of sketches may run out of the given memory, request more memory on heap and move there.
// In that case we need to reuse the object from the cache as opposed to wrapping the new buffer.
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
DoublesUnion union = unions.get(oldBuffer).get(oldPosition);
final WritableMemory oldMem = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
if (union.isSameResource(oldMem)) { // union was not relocated on heap
final WritableMemory newMem = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
union = DoublesUnion.wrap(newMem);
}
putUnion(newBuffer, newPosition, union);
Int2ObjectMap<DoublesUnion> map = unions.get(oldBuffer);
map.remove(oldPosition);
if (map.isEmpty()) {
unions.remove(oldBuffer);
memCache.remove(oldBuffer);
}
}
/**
* Retrieves the sketch at a particular position.
*/
public DoublesUnion getSketchAtPosition(final ByteBuffer buf, final int position)
{
return unions.get(buf).get(position);
}
private WritableMemory getMemory(final ByteBuffer buffer)
{
return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
}
private void putUnion(final ByteBuffer buffer, final int position, final DoublesUnion union)
{
Int2ObjectMap<DoublesUnion> map = unions.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
map.put(position, union);
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles;
import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.datasketches.quantiles.DoublesUnion;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.vector.VectorObjectSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class DoublesSketchMergeVectorAggregator implements VectorAggregator
{
private final VectorObjectSelector selector;
private final DoublesSketchMergeBufferAggregatorHelper helper;
public DoublesSketchMergeVectorAggregator(
final VectorObjectSelector selector,
final int k,
final int maxIntermediateSize
)
{
this.selector = selector;
this.helper = new DoublesSketchMergeBufferAggregatorHelper(k, maxIntermediateSize);
}
@Override
public void init(ByteBuffer buf, int position)
{
helper.init(buf, position);
}
@Override
public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
{
final Object[] vector = selector.getObjectVector();
final DoublesUnion union = helper.getSketchAtPosition(buf, position);
for (int i = startRow; i < endRow; i++) {
final DoublesSketch sketch = (DoublesSketch) vector[i];
if (sketch != null) {
union.update(sketch);
}
}
}
@Override
public void aggregate(
final ByteBuffer buf,
final int numRows,
final int[] positions,
@Nullable final int[] rows,
final int positionOffset
)
{
final Object[] vector = selector.getObjectVector();
for (int i = 0; i < numRows; i++) {
final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? rows[i] : i];
if (sketch != null) {
final int position = positions[i] + positionOffset;
final DoublesUnion union = helper.getSketchAtPosition(buf, position);
union.update(sketch);
}
}
}
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
return helper.get(buf, position);
}
@Override
public void close()
{
helper.clear();
}
@Override
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
}
}

View File

@ -41,8 +41,9 @@ public class DoublesSketchOperations
return (DoublesSketch) serializedSketch;
}
throw new ISE(
"Object is not of a type that can be deserialized to a quantiles DoublsSketch: "
+ serializedSketch.getClass());
"Object is not of a type that can be deserialized to a quantiles DoublesSketch: %s",
serializedSketch == null ? "null" : serializedSketch.getClass()
);
}
public static DoublesSketch deserializeFromBase64EncodedString(final String str)

View File

@ -20,20 +20,42 @@
package org.apache.druid.query.aggregation.datasketches.quantiles;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class NoopDoublesSketchBufferAggregator implements BufferAggregator
public class NoopDoublesSketchBufferAggregator implements BufferAggregator, VectorAggregator
{
@Override
public void init(final ByteBuffer buf, final int position)
{
// Nothing to do.
}
@Override
public void aggregate(final ByteBuffer buf, final int position)
{
// Nothing to do.
}
@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
// Nothing to do.
}
@Override
public void aggregate(
ByteBuffer buf,
int numRows,
int[] positions,
@Nullable int[] rows,
int positionOffset
)
{
// Nothing to do.
}
@Override
@ -54,13 +76,21 @@ public class NoopDoublesSketchBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("Not implemented");
}
@Override
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
// Nothing to do.
}
@Override
public void close()
{
// Nothing to do.
}
@Override
public void inspectRuntimeShape(final RuntimeShapeInspector inspector)
{
// Nothing to do.
}
}

View File

@ -20,10 +20,12 @@
package org.apache.druid.query.aggregation.datasketches.quantiles;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQueryConfig;
@ -54,24 +56,29 @@ public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
public DoublesSketchAggregatorTest(final GroupByQueryConfig config)
public DoublesSketchAggregatorTest(final GroupByQueryConfig config, final String vectorize)
{
DoublesSketchModule.registerSerde();
DoublesSketchModule module = new DoublesSketchModule();
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
module.getJacksonModules(), config, tempFolder);
module.getJacksonModules(),
config,
tempFolder
).withQueryContext(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize));
timeSeriesHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(
module.getJacksonModules(),
tempFolder
);
).withQueryContext(ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize));
}
@Parameterized.Parameters(name = "{0}")
@Parameterized.Parameters(name = "groupByConfig = {0}, vectorize = {1}")
public static Collection<?> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
constructors.add(new Object[]{config});
for (String vectorize : new String[]{"false", "true", "force"}) {
constructors.add(new Object[]{config, vectorize});
}
}
return constructors;
}
@ -381,7 +388,11 @@ public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest
// post agg with nulls
Object quantileObjectWithNulls = row.get(5);
Assert.assertTrue(quantileObjectWithNulls instanceof Double);
Assert.assertEquals(NullHandling.replaceWithDefault() ? 7.4 : 7.5, (double) quantileObjectWithNulls, 0.1); // median value
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 7.4 : 7.5,
(double) quantileObjectWithNulls,
0.1
); // median value
// post agg with nulls
Object quantilesObjectWithNulls = row.get(6);

View File

@ -96,8 +96,10 @@ import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* This class provides general utility to test any druid aggregation implementation given raw data,
@ -116,6 +118,8 @@ public class AggregationTestHelper implements Closeable
private final TemporaryFolder tempFolder;
private final Closer resourceCloser;
private final Map<String, Object> queryContext;
private AggregationTestHelper(
ObjectMapper mapper,
IndexMerger indexMerger,
@ -124,7 +128,8 @@ public class AggregationTestHelper implements Closeable
QueryRunnerFactory factory,
TemporaryFolder tempFolder,
List<? extends Module> jsonModulesToRegister,
Closer resourceCloser
Closer resourceCloser,
Map<String, Object> queryContext
)
{
this.mapper = mapper;
@ -134,6 +139,7 @@ public class AggregationTestHelper implements Closeable
this.factory = factory;
this.tempFolder = tempFolder;
this.resourceCloser = resourceCloser;
this.queryContext = queryContext;
for (Module mod : jsonModulesToRegister) {
mapper.registerModule(mod);
@ -174,7 +180,8 @@ public class AggregationTestHelper implements Closeable
factory,
tempFolder,
jsonModulesToRegister,
closer
closer,
Collections.emptyMap()
);
}
@ -213,7 +220,8 @@ public class AggregationTestHelper implements Closeable
factory,
tempFolder,
jsonModulesToRegister,
Closer.create()
Closer.create(),
Collections.emptyMap()
);
}
@ -264,7 +272,8 @@ public class AggregationTestHelper implements Closeable
factory,
tempFolder,
jsonModulesToRegister,
resourceCloser
resourceCloser,
Collections.emptyMap()
);
}
@ -307,7 +316,25 @@ public class AggregationTestHelper implements Closeable
factory,
tempFolder,
jsonModulesToRegister,
resourceCloser
resourceCloser,
Collections.emptyMap()
);
}
public AggregationTestHelper withQueryContext(final Map<String, Object> queryContext)
{
final Map<String, Object> newContext = new HashMap<>(this.queryContext);
newContext.putAll(queryContext);
return new AggregationTestHelper(
mapper,
indexMerger,
indexIO,
toolChest,
factory,
tempFolder,
Collections.emptyList(),
resourceCloser,
newContext
);
}
@ -658,7 +685,7 @@ public class AggregationTestHelper implements Closeable
//from each segment, later deserialize and merge and finally return the results
public <T> Sequence<T> runQueryOnSegments(final List<File> segmentDirs, final String queryJson)
{
return runQueryOnSegments(segmentDirs, readQuery(queryJson));
return runQueryOnSegments(segmentDirs, readQuery(queryJson).withOverriddenContext(queryContext));
}
public <T> Sequence<T> runQueryOnSegments(final List<File> segmentDirs, final Query<T> query)