mirror of https://github.com/apache/druid.git
Vectorized theta sketch aggregator + rework of VectorColumnProcessorFactory. (#10767)
* Vectorized theta sketch aggregator. Also a refactoring of BufferAggregator and VectorAggregator such that they share a common interface, BaseBufferAggregator. This allows implementing both in the same file with an abstract + dual subclass structure. * Rework implementation to use composition instead of inheritance. * Rework things to enable working properly for both complex types and regular types. Involved finally moving makeVectorProcessor from DimensionHandlerUtils into ColumnProcessors and harmonizing the two things. * Add missing method. * Style and name changes. * Fix issues from inspections. * Fix style issue.
This commit is contained in:
parent
0e4750bac2
commit
6c0c6e60b3
|
@ -166,6 +166,11 @@
|
|||
<artifactId>easymock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>nl.jqno.equalsverifier</groupId>
|
||||
<artifactId>equalsverifier</artifactId>
|
||||
|
@ -178,6 +183,12 @@
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-hll</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
|
|
|
@ -31,9 +31,12 @@ import org.apache.druid.query.aggregation.Aggregator;
|
|||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.BufferAggregator;
|
||||
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.segment.BaseObjectColumnValueSelector;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.ColumnValueSelector;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -77,6 +80,18 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
|
|||
return new SketchBufferAggregator(selector, size, getMaxIntermediateSizeWithNulls());
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
|
||||
{
|
||||
return new SketchVectorAggregator(selectorFactory, fieldName, size, getMaxIntermediateSizeWithNulls());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canVectorize(ColumnInspector columnInspector)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -19,39 +19,29 @@
|
|||
|
||||
package org.apache.druid.query.aggregation.datasketches.theta;
|
||||
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
|
||||
import org.apache.datasketches.Family;
|
||||
import org.apache.datasketches.memory.WritableMemory;
|
||||
import org.apache.datasketches.theta.SetOperation;
|
||||
import org.apache.datasketches.theta.Union;
|
||||
import org.apache.druid.query.aggregation.BufferAggregator;
|
||||
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
|
||||
import org.apache.druid.segment.BaseObjectColumnValueSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.IdentityHashMap;
|
||||
|
||||
public class SketchBufferAggregator implements BufferAggregator
|
||||
{
|
||||
private final BaseObjectColumnValueSelector selector;
|
||||
private final int size;
|
||||
private final int maxIntermediateSize;
|
||||
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Union>> unions = new IdentityHashMap<>();
|
||||
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
|
||||
private final SketchBufferAggregatorHelper helper;
|
||||
|
||||
public SketchBufferAggregator(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize)
|
||||
{
|
||||
this.selector = selector;
|
||||
this.size = size;
|
||||
this.maxIntermediateSize = maxIntermediateSize;
|
||||
this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
createNewUnion(buf, position, false);
|
||||
helper.init(buf, position);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -62,49 +52,16 @@ public class SketchBufferAggregator implements BufferAggregator
|
|||
return;
|
||||
}
|
||||
|
||||
Union union = getOrCreateUnion(buf, position);
|
||||
Union union = helper.getOrCreateUnion(buf, position);
|
||||
SketchAggregator.updateUnion(union, update);
|
||||
}
|
||||
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
Int2ObjectMap<Union> unionMap = unions.get(buf);
|
||||
Union union = unionMap != null ? unionMap.get(position) : null;
|
||||
if (union == null) {
|
||||
return SketchHolder.EMPTY;
|
||||
}
|
||||
//in the code below, I am returning SetOp.getResult(true, null)
|
||||
//"true" returns an ordered sketch but slower to compute than unordered sketch.
|
||||
//however, advantage of ordered sketch is that they are faster to "union" later
|
||||
//given that results from the aggregator will be combined further, it is better
|
||||
//to return the ordered sketch here
|
||||
return SketchHolder.of(union.getResult(true, null));
|
||||
}
|
||||
|
||||
private Union getOrCreateUnion(ByteBuffer buf, int position)
|
||||
{
|
||||
Int2ObjectMap<Union> unionMap = unions.get(buf);
|
||||
Union union = unionMap != null ? unionMap.get(position) : null;
|
||||
if (union != null) {
|
||||
return union;
|
||||
}
|
||||
return createNewUnion(buf, position, true);
|
||||
}
|
||||
|
||||
private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped)
|
||||
{
|
||||
WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize);
|
||||
Union union = isWrapped
|
||||
? (Union) SetOperation.wrap(mem)
|
||||
: (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem);
|
||||
Int2ObjectMap<Union> unionMap = unions.get(buf);
|
||||
if (unionMap == null) {
|
||||
unionMap = new Int2ObjectOpenHashMap<>();
|
||||
unions.put(buf, unionMap);
|
||||
}
|
||||
unionMap.put(position, union);
|
||||
return union;
|
||||
return helper.get(buf, position);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -128,8 +85,7 @@ public class SketchBufferAggregator implements BufferAggregator
|
|||
@Override
|
||||
public void close()
|
||||
{
|
||||
unions.clear();
|
||||
memCache.clear();
|
||||
helper.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -141,25 +97,6 @@ public class SketchBufferAggregator implements BufferAggregator
|
|||
@Override
|
||||
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
|
||||
{
|
||||
createNewUnion(newBuffer, newPosition, true);
|
||||
Int2ObjectMap<Union> unionMap = unions.get(oldBuffer);
|
||||
if (unionMap != null) {
|
||||
unionMap.remove(oldPosition);
|
||||
if (unionMap.isEmpty()) {
|
||||
unions.remove(oldBuffer);
|
||||
memCache.remove(oldBuffer);
|
||||
}
|
||||
}
|
||||
helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
|
||||
}
|
||||
|
||||
private WritableMemory getMemory(ByteBuffer buffer)
|
||||
{
|
||||
WritableMemory mem = memCache.get(buffer);
|
||||
if (mem == null) {
|
||||
mem = WritableMemory.wrap(buffer, ByteOrder.LITTLE_ENDIAN);
|
||||
memCache.put(buffer, mem);
|
||||
}
|
||||
return mem;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.datasketches.theta;
|
||||
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
|
||||
import org.apache.datasketches.Family;
|
||||
import org.apache.datasketches.memory.WritableMemory;
|
||||
import org.apache.datasketches.theta.SetOperation;
|
||||
import org.apache.datasketches.theta.Union;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.IdentityHashMap;
|
||||
|
||||
/**
|
||||
* A helper class used by {@link SketchBufferAggregator} and {@link SketchVectorAggregator}
|
||||
* for aggregation operations on byte buffers. Getting the object from value selectors is outside this class.
|
||||
*/
|
||||
final class SketchBufferAggregatorHelper
|
||||
{
|
||||
private final int size;
|
||||
private final int maxIntermediateSize;
|
||||
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Union>> unions = new IdentityHashMap<>();
|
||||
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
|
||||
|
||||
public SketchBufferAggregatorHelper(final int size, final int maxIntermediateSize)
|
||||
{
|
||||
this.size = size;
|
||||
this.maxIntermediateSize = maxIntermediateSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and
|
||||
* {@link org.apache.druid.query.aggregation.VectorAggregator#init}.
|
||||
*/
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
createNewUnion(buf, position, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and
|
||||
* {@link org.apache.druid.query.aggregation.VectorAggregator#get}.
|
||||
*/
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
Int2ObjectMap<Union> unionMap = unions.get(buf);
|
||||
Union union = unionMap != null ? unionMap.get(position) : null;
|
||||
if (union == null) {
|
||||
return SketchHolder.EMPTY;
|
||||
}
|
||||
//in the code below, I am returning SetOp.getResult(true, null)
|
||||
//"true" returns an ordered sketch but slower to compute than unordered sketch.
|
||||
//however, advantage of ordered sketch is that they are faster to "union" later
|
||||
//given that results from the aggregator will be combined further, it is better
|
||||
//to return the ordered sketch here
|
||||
return SketchHolder.of(union.getResult(true, null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#relocate} and
|
||||
* {@link org.apache.druid.query.aggregation.VectorAggregator#relocate}.
|
||||
*/
|
||||
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
|
||||
{
|
||||
createNewUnion(newBuffer, newPosition, true);
|
||||
Int2ObjectMap<Union> unionMap = unions.get(oldBuffer);
|
||||
if (unionMap != null) {
|
||||
unionMap.remove(oldPosition);
|
||||
if (unionMap.isEmpty()) {
|
||||
unions.remove(oldBuffer);
|
||||
memCache.remove(oldBuffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link Union} associated with a particular buffer location.
|
||||
*
|
||||
* The Union object will be cached in this helper until {@link #close()} is called.
|
||||
*/
|
||||
public Union getOrCreateUnion(ByteBuffer buf, int position)
|
||||
{
|
||||
Int2ObjectMap<Union> unionMap = unions.get(buf);
|
||||
Union union = unionMap != null ? unionMap.get(position) : null;
|
||||
if (union != null) {
|
||||
return union;
|
||||
}
|
||||
return createNewUnion(buf, position, true);
|
||||
}
|
||||
|
||||
private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped)
|
||||
{
|
||||
WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize);
|
||||
Union union = isWrapped
|
||||
? (Union) SetOperation.wrap(mem)
|
||||
: (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem);
|
||||
Int2ObjectMap<Union> unionMap = unions.get(buf);
|
||||
if (unionMap == null) {
|
||||
unionMap = new Int2ObjectOpenHashMap<>();
|
||||
unions.put(buf, unionMap);
|
||||
}
|
||||
unionMap.put(position, union);
|
||||
return union;
|
||||
}
|
||||
|
||||
public void close()
|
||||
{
|
||||
unions.clear();
|
||||
memCache.clear();
|
||||
}
|
||||
|
||||
private WritableMemory getMemory(ByteBuffer buffer)
|
||||
{
|
||||
WritableMemory mem = memCache.get(buffer);
|
||||
if (mem == null) {
|
||||
mem = WritableMemory.wrap(buffer, ByteOrder.LITTLE_ENDIAN);
|
||||
memCache.put(buffer, mem);
|
||||
}
|
||||
return mem;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.datasketches.theta;
|
||||
|
||||
import org.apache.datasketches.theta.Union;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory;
|
||||
import org.apache.druid.segment.ColumnProcessors;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class SketchVectorAggregator implements VectorAggregator
|
||||
{
|
||||
private final Supplier<Object[]> toObjectProcessor;
|
||||
private final SketchBufferAggregatorHelper helper;
|
||||
|
||||
public SketchVectorAggregator(
|
||||
VectorColumnSelectorFactory columnSelectorFactory,
|
||||
String column,
|
||||
int size,
|
||||
int maxIntermediateSize
|
||||
)
|
||||
{
|
||||
this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize);
|
||||
this.toObjectProcessor =
|
||||
ColumnProcessors.makeVectorProcessor(
|
||||
column,
|
||||
ToObjectVectorColumnProcessorFactory.INSTANCE,
|
||||
columnSelectorFactory
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(final ByteBuffer buf, final int position)
|
||||
{
|
||||
helper.init(buf, position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
|
||||
{
|
||||
final Union union = helper.getOrCreateUnion(buf, position);
|
||||
final Object[] vector = toObjectProcessor.get();
|
||||
|
||||
for (int i = startRow; i < endRow; i++) {
|
||||
final Object o = vector[i];
|
||||
if (o != null) {
|
||||
SketchAggregator.updateUnion(union, o);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(
|
||||
final ByteBuffer buf,
|
||||
final int numRows,
|
||||
final int[] positions,
|
||||
@Nullable final int[] rows,
|
||||
final int positionOffset
|
||||
)
|
||||
{
|
||||
final Object[] vector = toObjectProcessor.get();
|
||||
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
final Object o = vector[rows != null ? rows[i] : i];
|
||||
|
||||
if (o != null) {
|
||||
final int position = positions[i] + positionOffset;
|
||||
final Union union = helper.getOrCreateUnion(buf, position);
|
||||
SketchAggregator.updateUnion(union, o);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
return helper.get(buf, position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
|
||||
{
|
||||
helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
helper.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,141 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.datasketches.util;
|
||||
|
||||
import org.apache.druid.segment.DimensionSelector;
|
||||
import org.apache.druid.segment.VectorColumnProcessorFactory;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.data.IndexedInts;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.VectorObjectSelector;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Builds vector processors that return Object arrays. Not a terribly efficient way to write aggregators, since this
|
||||
* is fighting against the strongly-typed design of the vector processing system. However, it simplifies the aggregator
|
||||
* code quite a bit, and most of the sketches that use this don't have special handling for primitive types anyway, so
|
||||
* we hopefully shouldn't lose much performance.
|
||||
*/
|
||||
public class ToObjectVectorColumnProcessorFactory implements VectorColumnProcessorFactory<Supplier<Object[]>>
|
||||
{
|
||||
public static final ToObjectVectorColumnProcessorFactory INSTANCE = new ToObjectVectorColumnProcessorFactory();
|
||||
|
||||
private ToObjectVectorColumnProcessorFactory()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supplier<Object[]> makeSingleValueDimensionProcessor(
|
||||
ColumnCapabilities capabilities,
|
||||
SingleValueDimensionVectorSelector selector
|
||||
)
|
||||
{
|
||||
final Object[] retVal = new Object[selector.getMaxVectorSize()];
|
||||
|
||||
return () -> {
|
||||
final int[] values = selector.getRowVector();
|
||||
|
||||
for (int i = 0; i < selector.getCurrentVectorSize(); i++) {
|
||||
retVal[i] = selector.lookupName(values[i]);
|
||||
}
|
||||
|
||||
return retVal;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supplier<Object[]> makeMultiValueDimensionProcessor(
|
||||
ColumnCapabilities capabilities,
|
||||
MultiValueDimensionVectorSelector selector
|
||||
)
|
||||
{
|
||||
final Object[] retVal = new Object[selector.getMaxVectorSize()];
|
||||
|
||||
return () -> {
|
||||
final IndexedInts[] values = selector.getRowVector();
|
||||
|
||||
for (int i = 0; i < selector.getCurrentVectorSize(); i++) {
|
||||
retVal[i] = DimensionSelector.rowToObject(values[i], selector);
|
||||
}
|
||||
|
||||
return retVal;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supplier<Object[]> makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
|
||||
{
|
||||
final Object[] retVal = new Object[selector.getMaxVectorSize()];
|
||||
|
||||
return () -> {
|
||||
final float[] values = selector.getFloatVector();
|
||||
final boolean[] nulls = selector.getNullVector();
|
||||
|
||||
for (int i = 0; i < selector.getCurrentVectorSize(); i++) {
|
||||
retVal[i] = nulls == null || !nulls[i] ? values[i] : null;
|
||||
}
|
||||
|
||||
return retVal;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supplier<Object[]> makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
|
||||
{
|
||||
final Object[] retVal = new Object[selector.getMaxVectorSize()];
|
||||
|
||||
return () -> {
|
||||
final double[] values = selector.getDoubleVector();
|
||||
final boolean[] nulls = selector.getNullVector();
|
||||
|
||||
for (int i = 0; i < selector.getCurrentVectorSize(); i++) {
|
||||
retVal[i] = nulls == null || !nulls[i] ? values[i] : null;
|
||||
}
|
||||
|
||||
return retVal;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supplier<Object[]> makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
|
||||
{
|
||||
final Object[] retVal = new Object[selector.getMaxVectorSize()];
|
||||
|
||||
return () -> {
|
||||
final long[] values = selector.getLongVector();
|
||||
final boolean[] nulls = selector.getNullVector();
|
||||
|
||||
for (int i = 0; i < selector.getCurrentVectorSize(); i++) {
|
||||
retVal[i] = nulls == null || !nulls[i] ? values[i] : null;
|
||||
}
|
||||
|
||||
return retVal;
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supplier<Object[]> makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
|
||||
{
|
||||
return selector::getObjectVector;
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.query.aggregation.datasketches.theta;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -34,6 +35,7 @@ import org.apache.druid.java.util.common.DateTimes;
|
|||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.QueryContexts;
|
||||
import org.apache.druid.query.aggregation.AggregationTestHelper;
|
||||
import org.apache.druid.query.aggregation.Aggregator;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
|
@ -71,26 +73,30 @@ import java.util.stream.Collectors;
|
|||
public class SketchAggregationTest
|
||||
{
|
||||
private final AggregationTestHelper helper;
|
||||
private final QueryContexts.Vectorize vectorize;
|
||||
|
||||
@Rule
|
||||
public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||
|
||||
public SketchAggregationTest(final GroupByQueryConfig config)
|
||||
public SketchAggregationTest(final GroupByQueryConfig config, final String vectorize)
|
||||
{
|
||||
SketchModule.registerSerde();
|
||||
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
|
||||
this.helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
|
||||
new SketchModule().getJacksonModules(),
|
||||
config,
|
||||
tempFolder
|
||||
);
|
||||
this.vectorize = QueryContexts.Vectorize.fromString(vectorize);
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "{0}")
|
||||
@Parameterized.Parameters(name = "config = {0}, vectorize = {1}")
|
||||
public static Collection<?> constructorFeeder()
|
||||
{
|
||||
final List<Object[]> constructors = new ArrayList<>();
|
||||
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
|
||||
constructors.add(new Object[]{config});
|
||||
for (String vectorize : new String[]{"false", "force"}) {
|
||||
constructors.add(new Object[]{config, vectorize});
|
||||
}
|
||||
}
|
||||
return constructors;
|
||||
}
|
||||
|
@ -104,9 +110,8 @@ public class SketchAggregationTest
|
|||
@Test
|
||||
public void testSketchDataIngestAndGpByQuery() throws Exception
|
||||
{
|
||||
final String groupByQueryString = readFileFromClasspathAsString("sketch_test_data_group_by_query.json");
|
||||
final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper()
|
||||
.readValue(groupByQueryString, Query.class);
|
||||
final GroupByQuery groupByQuery =
|
||||
readQueryFromClasspath("sketch_test_data_group_by_query.json", helper.getObjectMapper(), vectorize);
|
||||
|
||||
final Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
|
||||
new File(SketchAggregationTest.class.getClassLoader().getResource("sketch_test_data.tsv").getFile()),
|
||||
|
@ -115,7 +120,7 @@ public class SketchAggregationTest
|
|||
0,
|
||||
Granularities.NONE,
|
||||
1000,
|
||||
groupByQueryString
|
||||
groupByQuery
|
||||
);
|
||||
|
||||
final String expectedSummary = "\n### HeapCompactOrderedSketch SUMMARY: \n"
|
||||
|
@ -164,9 +169,8 @@ public class SketchAggregationTest
|
|||
@Test
|
||||
public void testEmptySketchAggregateCombine() throws Exception
|
||||
{
|
||||
final String groupByQueryString = readFileFromClasspathAsString("empty_sketch_group_by_query.json");
|
||||
final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper()
|
||||
.readValue(groupByQueryString, Query.class);
|
||||
final GroupByQuery groupByQuery =
|
||||
readQueryFromClasspath("empty_sketch_group_by_query.json", helper.getObjectMapper(), vectorize);
|
||||
|
||||
final Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
|
||||
new File(SketchAggregationTest.class.getClassLoader().getResource("empty_sketch_data.tsv").getFile()),
|
||||
|
@ -175,7 +179,7 @@ public class SketchAggregationTest
|
|||
0,
|
||||
Granularities.NONE,
|
||||
5,
|
||||
groupByQueryString
|
||||
groupByQuery
|
||||
);
|
||||
|
||||
List<ResultRow> results = seq.toList();
|
||||
|
@ -199,9 +203,8 @@ public class SketchAggregationTest
|
|||
@Test
|
||||
public void testThetaCardinalityOnSimpleColumn() throws Exception
|
||||
{
|
||||
final String groupByQueryString = readFileFromClasspathAsString("simple_test_data_group_by_query.json");
|
||||
final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper()
|
||||
.readValue(groupByQueryString, Query.class);
|
||||
final GroupByQuery groupByQuery =
|
||||
readQueryFromClasspath("simple_test_data_group_by_query.json", helper.getObjectMapper(), vectorize);
|
||||
|
||||
final Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
|
||||
new File(SketchAggregationTest.class.getClassLoader().getResource("simple_test_data.tsv").getFile()),
|
||||
|
@ -215,7 +218,7 @@ public class SketchAggregationTest
|
|||
0,
|
||||
Granularities.NONE,
|
||||
1000,
|
||||
groupByQueryString
|
||||
groupByQuery
|
||||
);
|
||||
|
||||
List<ResultRow> results = seq.toList();
|
||||
|
@ -426,9 +429,8 @@ public class SketchAggregationTest
|
|||
@Test
|
||||
public void testRetentionDataIngestAndGpByQuery() throws Exception
|
||||
{
|
||||
final String groupByQueryString = readFileFromClasspathAsString("retention_test_data_group_by_query.json");
|
||||
final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper()
|
||||
.readValue(groupByQueryString, Query.class);
|
||||
final GroupByQuery groupByQuery =
|
||||
readQueryFromClasspath("retention_test_data_group_by_query.json", helper.getObjectMapper(), vectorize);
|
||||
|
||||
final Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
|
||||
new File(this.getClass().getClassLoader().getResource("retention_test_data.tsv").getFile()),
|
||||
|
@ -437,7 +439,7 @@ public class SketchAggregationTest
|
|||
0,
|
||||
Granularities.NONE,
|
||||
5,
|
||||
groupByQueryString
|
||||
groupByQuery
|
||||
);
|
||||
|
||||
List<ResultRow> results = seq.toList();
|
||||
|
@ -538,6 +540,19 @@ public class SketchAggregationTest
|
|||
);
|
||||
}
|
||||
|
||||
public static <T, Q extends Query<T>> Q readQueryFromClasspath(
|
||||
final String fileName,
|
||||
final ObjectMapper objectMapper,
|
||||
final QueryContexts.Vectorize vectorize
|
||||
) throws IOException
|
||||
{
|
||||
final String queryString = readFileFromClasspathAsString(fileName);
|
||||
|
||||
//noinspection unchecked
|
||||
return (Q) objectMapper.readValue(queryString, Query.class)
|
||||
.withOverriddenContext(ImmutableMap.of("vectorize", vectorize.toString()));
|
||||
}
|
||||
|
||||
public static String readFileFromClasspathAsString(String fileName) throws IOException
|
||||
{
|
||||
return Files.asCharSource(
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.DateTimes;
|
|||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.QueryContexts;
|
||||
import org.apache.druid.query.Result;
|
||||
import org.apache.druid.query.aggregation.AggregationTestHelper;
|
||||
import org.apache.druid.query.groupby.GroupByQuery;
|
||||
|
@ -63,22 +64,26 @@ public class SketchAggregationWithSimpleDataTest extends InitializedNullHandling
|
|||
public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||
|
||||
private final GroupByQueryConfig config;
|
||||
private final QueryContexts.Vectorize vectorize;
|
||||
|
||||
private SketchModule sm;
|
||||
private File s1;
|
||||
private File s2;
|
||||
|
||||
public SketchAggregationWithSimpleDataTest(GroupByQueryConfig config)
|
||||
public SketchAggregationWithSimpleDataTest(GroupByQueryConfig config, String vectorize)
|
||||
{
|
||||
this.config = config;
|
||||
this.vectorize = QueryContexts.Vectorize.fromString(vectorize);
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "{0}")
|
||||
@Parameterized.Parameters(name = "config = {0}, vectorize = {1}")
|
||||
public static Collection<?> constructorFeeder()
|
||||
{
|
||||
final List<Object[]> constructors = new ArrayList<>();
|
||||
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
|
||||
constructors.add(new Object[]{config});
|
||||
for (String vectorize : new String[]{"false", "force"}) {
|
||||
constructors.add(new Object[]{config, vectorize});
|
||||
}
|
||||
}
|
||||
return constructors;
|
||||
}
|
||||
|
@ -130,14 +135,15 @@ public class SketchAggregationWithSimpleDataTest extends InitializedNullHandling
|
|||
tempFolder
|
||||
)
|
||||
) {
|
||||
final String groupByQueryString = readFileFromClasspathAsString("simple_test_data_group_by_query.json");
|
||||
final GroupByQuery groupByQuery = (GroupByQuery) gpByQueryAggregationTestHelper
|
||||
.getObjectMapper()
|
||||
.readValue(groupByQueryString, Query.class);
|
||||
final GroupByQuery groupByQuery = SketchAggregationTest.readQueryFromClasspath(
|
||||
"simple_test_data_group_by_query.json",
|
||||
gpByQueryAggregationTestHelper.getObjectMapper(),
|
||||
vectorize
|
||||
);
|
||||
|
||||
Sequence<ResultRow> seq = gpByQueryAggregationTestHelper.runQueryOnSegments(
|
||||
ImmutableList.of(s1, s2),
|
||||
groupByQueryString
|
||||
groupByQuery
|
||||
);
|
||||
|
||||
List<MapBasedRow> results = seq.map(row -> row.toMapBasedRow(groupByQuery)).toList();
|
||||
|
@ -225,7 +231,11 @@ public class SketchAggregationWithSimpleDataTest extends InitializedNullHandling
|
|||
|
||||
Sequence seq = timeseriesQueryAggregationTestHelper.runQueryOnSegments(
|
||||
ImmutableList.of(s1, s2),
|
||||
readFileFromClasspathAsString("timeseries_query.json")
|
||||
(Query) SketchAggregationTest.readQueryFromClasspath(
|
||||
"timeseries_query.json",
|
||||
timeseriesQueryAggregationTestHelper.getObjectMapper(),
|
||||
vectorize
|
||||
)
|
||||
);
|
||||
|
||||
Result<TimeseriesResultValue> result = (Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList());
|
||||
|
@ -251,7 +261,11 @@ public class SketchAggregationWithSimpleDataTest extends InitializedNullHandling
|
|||
|
||||
Sequence seq = topNQueryAggregationTestHelper.runQueryOnSegments(
|
||||
ImmutableList.of(s1, s2),
|
||||
readFileFromClasspathAsString("topn_query.json")
|
||||
(Query) SketchAggregationTest.readQueryFromClasspath(
|
||||
"topn_query.json",
|
||||
topNQueryAggregationTestHelper.getObjectMapper(),
|
||||
vectorize
|
||||
)
|
||||
);
|
||||
|
||||
Result<TopNResultValue> result = (Result<TopNResultValue>) Iterables.getOnlyElement(seq.toList());
|
||||
|
@ -278,7 +292,11 @@ public class SketchAggregationWithSimpleDataTest extends InitializedNullHandling
|
|||
|
||||
Sequence seq = topNQueryAggregationTestHelper.runQueryOnSegments(
|
||||
ImmutableList.of(s1, s2),
|
||||
readFileFromClasspathAsString("topn_query_sketch_const.json")
|
||||
(Query) SketchAggregationTest.readQueryFromClasspath(
|
||||
"topn_query_sketch_const.json",
|
||||
topNQueryAggregationTestHelper.getObjectMapper(),
|
||||
vectorize
|
||||
)
|
||||
);
|
||||
|
||||
Result<TopNResultValue> result = (Result<TopNResultValue>) Iterables.getOnlyElement(seq.toList());
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.datasketches.util;
|
||||
|
||||
import com.google.common.collect.Iterables;
|
||||
import org.apache.druid.hll.HyperLogLogCollector;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.segment.ColumnProcessors;
|
||||
import org.apache.druid.segment.QueryableIndex;
|
||||
import org.apache.druid.segment.QueryableIndexStorageAdapter;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
import org.apache.druid.segment.TestIndex;
|
||||
import org.apache.druid.segment.VirtualColumns;
|
||||
import org.apache.druid.segment.vector.VectorCursor;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class ToObjectVectorColumnProcessorFactoryTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private StorageAdapter adapter;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
final QueryableIndex index = TestIndex.getMMappedTestIndex();
|
||||
adapter = new QueryableIndexStorageAdapter(index);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRead()
|
||||
{
|
||||
try (final VectorCursor cursor = makeCursor()) {
|
||||
final Supplier<Object[]> qualitySupplier = ColumnProcessors.makeVectorProcessor(
|
||||
"quality",
|
||||
ToObjectVectorColumnProcessorFactory.INSTANCE,
|
||||
cursor.getColumnSelectorFactory()
|
||||
);
|
||||
|
||||
final Supplier<Object[]> qualityLongSupplier = ColumnProcessors.makeVectorProcessor(
|
||||
"qualityLong",
|
||||
ToObjectVectorColumnProcessorFactory.INSTANCE,
|
||||
cursor.getColumnSelectorFactory()
|
||||
);
|
||||
|
||||
final Supplier<Object[]> qualityFloatSupplier = ColumnProcessors.makeVectorProcessor(
|
||||
"qualityFloat",
|
||||
ToObjectVectorColumnProcessorFactory.INSTANCE,
|
||||
cursor.getColumnSelectorFactory()
|
||||
);
|
||||
|
||||
final Supplier<Object[]> qualityDoubleSupplier = ColumnProcessors.makeVectorProcessor(
|
||||
"qualityDouble",
|
||||
ToObjectVectorColumnProcessorFactory.INSTANCE,
|
||||
cursor.getColumnSelectorFactory()
|
||||
);
|
||||
|
||||
final Supplier<Object[]> placementSupplier = ColumnProcessors.makeVectorProcessor(
|
||||
"placement",
|
||||
ToObjectVectorColumnProcessorFactory.INSTANCE,
|
||||
cursor.getColumnSelectorFactory()
|
||||
);
|
||||
|
||||
final Supplier<Object[]> qualityUniquesSupplier = ColumnProcessors.makeVectorProcessor(
|
||||
"quality_uniques",
|
||||
ToObjectVectorColumnProcessorFactory.INSTANCE,
|
||||
cursor.getColumnSelectorFactory()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testString()
|
||||
{
|
||||
Assert.assertEquals(
|
||||
Arrays.asList(
|
||||
"automotive",
|
||||
"business",
|
||||
"entertainment",
|
||||
"health",
|
||||
"mezzanine",
|
||||
"news",
|
||||
"premium",
|
||||
"technology",
|
||||
"travel",
|
||||
"mezzanine"
|
||||
),
|
||||
readColumn("quality", 10)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLong()
|
||||
{
|
||||
Assert.assertEquals(
|
||||
Arrays.asList(1000L, 1100L, 1200L, 1300L, 1400L, 1500L, 1600L, 1700L, 1800L, 1400L),
|
||||
readColumn("qualityLong", 10)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFloat()
|
||||
{
|
||||
Assert.assertEquals(
|
||||
Arrays.asList(10000f, 11000f, 12000f, 13000f, 14000f, 15000f, 16000f, 17000f, 18000f, 14000f),
|
||||
readColumn("qualityFloat", 10)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDouble()
|
||||
{
|
||||
Assert.assertEquals(
|
||||
Arrays.asList(10000.0, 11000.0, 12000.0, 13000.0, 14000.0, 15000.0, 16000.0, 17000.0, 18000.0, 14000.0),
|
||||
readColumn("qualityDouble", 10)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiString()
|
||||
{
|
||||
Assert.assertEquals(
|
||||
Arrays.asList(
|
||||
Arrays.asList("a", "preferred"),
|
||||
Arrays.asList("b", "preferred"),
|
||||
Arrays.asList("e", "preferred"),
|
||||
Arrays.asList("h", "preferred"),
|
||||
Arrays.asList("m", "preferred"),
|
||||
Arrays.asList("n", "preferred"),
|
||||
Arrays.asList("p", "preferred"),
|
||||
Arrays.asList("preferred", "t"),
|
||||
Arrays.asList("preferred", "t"),
|
||||
Arrays.asList("m", "preferred")
|
||||
),
|
||||
readColumn("placementish", 10)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testComplexSketch()
|
||||
{
|
||||
final Object sketch = Iterables.getOnlyElement(readColumn("quality_uniques", 1));
|
||||
Assert.assertThat(sketch, CoreMatchers.instanceOf(HyperLogLogCollector.class));
|
||||
}
|
||||
|
||||
private VectorCursor makeCursor()
|
||||
{
|
||||
return adapter.makeVectorCursor(
|
||||
null,
|
||||
Intervals.ETERNITY,
|
||||
VirtualColumns.EMPTY,
|
||||
false,
|
||||
3, /* vector size */
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
private List<Object> readColumn(final String column, final int limit)
|
||||
{
|
||||
try (final VectorCursor cursor = makeCursor()) {
|
||||
final Supplier<Object[]> supplier = ColumnProcessors.makeVectorProcessor(
|
||||
column,
|
||||
ToObjectVectorColumnProcessorFactory.INSTANCE,
|
||||
cursor.getColumnSelectorFactory()
|
||||
);
|
||||
|
||||
final List<Object> retVal = new ArrayList<>();
|
||||
|
||||
while (!cursor.isDone()) {
|
||||
final Object[] objects = supplier.get();
|
||||
|
||||
for (int i = 0; i < cursor.getCurrentVectorSize(); i++) {
|
||||
retVal.add(objects[i]);
|
||||
|
||||
if (retVal.size() >= limit) {
|
||||
return retVal;
|
||||
}
|
||||
}
|
||||
|
||||
cursor.advance();
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -55,6 +55,7 @@ import org.apache.druid.query.filter.vector.VectorValueMatcher;
|
|||
import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
|
||||
import org.apache.druid.query.lookup.LookupExtractionFn;
|
||||
import org.apache.druid.query.lookup.LookupExtractor;
|
||||
import org.apache.druid.segment.ColumnProcessors;
|
||||
import org.apache.druid.segment.ColumnSelector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.DimensionHandlerUtils;
|
||||
|
@ -312,7 +313,7 @@ public class InDimFilter extends AbstractOptimizableDimFilter implements Filter
|
|||
@Override
|
||||
public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory)
|
||||
{
|
||||
return DimensionHandlerUtils.makeVectorProcessor(
|
||||
return ColumnProcessors.makeVectorProcessor(
|
||||
dimension,
|
||||
VectorValueMatcherColumnProcessorFactory.instance(),
|
||||
factory
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.filter.vector;
|
||||
|
||||
import org.apache.druid.query.filter.DruidPredicateFactory;
|
||||
import org.apache.druid.segment.vector.VectorSizeInspector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Treats all rows as null.
|
||||
*/
|
||||
public class NilVectorValueMatcher implements VectorValueMatcherFactory
|
||||
{
|
||||
private final VectorSizeInspector vectorInspector;
|
||||
|
||||
public NilVectorValueMatcher(final VectorSizeInspector vectorInspector)
|
||||
{
|
||||
this.vectorInspector = vectorInspector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorValueMatcher makeMatcher(@Nullable String value)
|
||||
{
|
||||
return BooleanVectorValueMatcher.of(vectorInspector, value == null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorValueMatcher makeMatcher(DruidPredicateFactory predicateFactory)
|
||||
{
|
||||
return BooleanVectorValueMatcher.of(vectorInspector, predicateFactory.makeStringPredicate().apply(null));
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ import org.apache.druid.segment.VectorColumnProcessorFactory;
|
|||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.VectorObjectSelector;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
public class VectorValueMatcherColumnProcessorFactory implements VectorColumnProcessorFactory<VectorValueMatcherFactory>
|
||||
|
@ -83,4 +84,13 @@ public class VectorValueMatcherColumnProcessorFactory implements VectorColumnPro
|
|||
{
|
||||
return new LongVectorValueMatcher(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorValueMatcherFactory makeObjectProcessor(
|
||||
final ColumnCapabilities capabilities,
|
||||
final VectorObjectSelector selector
|
||||
)
|
||||
{
|
||||
return new NilVectorValueMatcher(selector);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.druid.segment.column.ColumnCapabilities;
|
|||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.VectorObjectSelector;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
public class GroupByVectorColumnProcessorFactory implements VectorColumnProcessorFactory<GroupByVectorColumnSelector>
|
||||
|
@ -102,4 +103,13 @@ public class GroupByVectorColumnProcessorFactory implements VectorColumnProcesso
|
|||
}
|
||||
return new NullableLongGroupByVectorColumnSelector(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GroupByVectorColumnSelector makeObjectProcessor(
|
||||
final ColumnCapabilities capabilities,
|
||||
final VectorObjectSelector selector
|
||||
)
|
||||
{
|
||||
return NilGroupByVectorColumnSelector.INSTANCE;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ public interface GroupByVectorColumnSelector
|
|||
* Write key parts for this column into a particular result row.
|
||||
*
|
||||
* @param keyMemory key memory
|
||||
* @param keyOffset starting position for this key part within keyMemory
|
||||
* @param keyOffset starting position for this key part within keyMemory
|
||||
* @param resultRow result row to receive key parts
|
||||
* @param resultRowPosition position within the result row for this key part
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.groupby.epinephelinae.vector;
|
||||
|
||||
import org.apache.datasketches.memory.Memory;
|
||||
import org.apache.datasketches.memory.WritableMemory;
|
||||
import org.apache.druid.query.groupby.ResultRow;
|
||||
|
||||
/**
|
||||
* Treats all rows as null.
|
||||
*/
|
||||
public class NilGroupByVectorColumnSelector implements GroupByVectorColumnSelector
|
||||
{
|
||||
public static final NilGroupByVectorColumnSelector INSTANCE = new NilGroupByVectorColumnSelector();
|
||||
|
||||
private NilGroupByVectorColumnSelector()
|
||||
{
|
||||
// Singleton.
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getGroupingKeySize()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow)
|
||||
{
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeKeyToResultRow(Memory keyMemory, int keyOffset, ResultRow resultRow, int resultRowPosition)
|
||||
{
|
||||
resultRow.set(resultRowPosition, null);
|
||||
}
|
||||
}
|
|
@ -42,7 +42,7 @@ import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
|
|||
import org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper;
|
||||
import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
|
||||
import org.apache.druid.query.vector.VectorCursorGranularizer;
|
||||
import org.apache.druid.segment.DimensionHandlerUtils;
|
||||
import org.apache.druid.segment.ColumnProcessors;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
import org.apache.druid.segment.VirtualColumns;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
|
@ -175,7 +175,7 @@ public class VectorGroupByEngine
|
|||
final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
|
||||
final List<GroupByVectorColumnSelector> dimensions = query.getDimensions().stream().map(
|
||||
dimensionSpec ->
|
||||
DimensionHandlerUtils.makeVectorProcessor(
|
||||
ColumnProcessors.makeVectorProcessor(
|
||||
dimensionSpec,
|
||||
GroupByVectorColumnProcessorFactory.instance(),
|
||||
columnSelectorFactory
|
||||
|
|
|
@ -29,6 +29,12 @@ import org.apache.druid.query.extraction.ExtractionFn;
|
|||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.NilVectorSelector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorObjectSelector;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
import org.apache.druid.segment.virtual.ExpressionSelectors;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -38,11 +44,20 @@ import javax.annotation.Nullable;
|
|||
* top of it.
|
||||
*
|
||||
* @see DimensionHandlerUtils#createColumnSelectorPlus which this may eventually replace
|
||||
* @see DimensionHandlerUtils#makeVectorProcessor which creates similar, vectorized processors; may eventually be moved
|
||||
* into this class.
|
||||
*/
|
||||
public class ColumnProcessors
|
||||
{
|
||||
/**
|
||||
* Capabilites that are used when we return a nil selector for a nonexistent column.
|
||||
*/
|
||||
public static final ColumnCapabilities NIL_COLUMN_CAPABILITIES =
|
||||
new ColumnCapabilitiesImpl().setType(ValueType.STRING)
|
||||
.setDictionaryEncoded(true)
|
||||
.setDictionaryValuesUnique(true)
|
||||
.setDictionaryValuesSorted(true)
|
||||
.setHasBitmapIndexes(false)
|
||||
.setHasMultipleValues(false);
|
||||
|
||||
/**
|
||||
* Make a processor for a particular named column.
|
||||
*
|
||||
|
@ -81,25 +96,10 @@ public class ColumnProcessors
|
|||
)
|
||||
{
|
||||
return makeProcessorInternal(
|
||||
factory -> {
|
||||
// Capabilities of the column that the dimensionSpec is reading. We can't return these straight-up, because
|
||||
// the _result_ of the dimensionSpec might have different capabilities. But what we return will generally be
|
||||
// based on them.
|
||||
final ColumnCapabilities dimensionCapabilities = factory.getColumnCapabilities(dimensionSpec.getDimension());
|
||||
|
||||
if (dimensionSpec.getExtractionFn() != null || dimensionSpec.mustDecorate()) {
|
||||
// DimensionSpec is doing some sort of transformation. The result is always a string.
|
||||
|
||||
return new ColumnCapabilitiesImpl()
|
||||
.setType(ValueType.STRING)
|
||||
.setDictionaryValuesSorted(dimensionSpec.getExtractionFn().preservesOrdering())
|
||||
.setDictionaryValuesUnique(dimensionSpec.getExtractionFn().getExtractionType() == ExtractionFn.ExtractionType.ONE_TO_ONE)
|
||||
.setHasMultipleValues(dimensionSpec.mustDecorate() || mayBeMultiValue(dimensionCapabilities));
|
||||
} else {
|
||||
// No transformation. Pass through.
|
||||
return dimensionCapabilities;
|
||||
}
|
||||
},
|
||||
factory -> computeDimensionSpecCapabilities(
|
||||
dimensionSpec,
|
||||
factory.getColumnCapabilities(dimensionSpec.getDimension())
|
||||
),
|
||||
factory -> factory.makeDimensionSelector(dimensionSpec),
|
||||
factory -> factory.makeColumnValueSelector(dimensionSpec.getDimension()),
|
||||
processorFactory,
|
||||
|
@ -144,6 +144,94 @@ public class ColumnProcessors
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a processor for a particular named column.
|
||||
*
|
||||
* @param column the column
|
||||
* @param processorFactory the processor factory
|
||||
* @param selectorFactory the column selector factory
|
||||
* @param <T> processor type
|
||||
*/
|
||||
public static <T> T makeVectorProcessor(
|
||||
final String column,
|
||||
final VectorColumnProcessorFactory<T> processorFactory,
|
||||
final VectorColumnSelectorFactory selectorFactory
|
||||
)
|
||||
{
|
||||
return makeVectorProcessorInternal(
|
||||
factory -> factory.getColumnCapabilities(column),
|
||||
factory -> factory.makeSingleValueDimensionSelector(DefaultDimensionSpec.of(column)),
|
||||
factory -> factory.makeMultiValueDimensionSelector(DefaultDimensionSpec.of(column)),
|
||||
factory -> factory.makeValueSelector(column),
|
||||
factory -> factory.makeObjectSelector(column),
|
||||
processorFactory,
|
||||
selectorFactory
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a processor for a particular {@link DimensionSpec}.
|
||||
*
|
||||
* @param dimensionSpec the dimension spec
|
||||
* @param processorFactory the processor factory
|
||||
* @param selectorFactory the column selector factory
|
||||
* @param <T> processor type
|
||||
*/
|
||||
public static <T> T makeVectorProcessor(
|
||||
final DimensionSpec dimensionSpec,
|
||||
final VectorColumnProcessorFactory<T> processorFactory,
|
||||
final VectorColumnSelectorFactory selectorFactory
|
||||
)
|
||||
{
|
||||
return makeVectorProcessorInternal(
|
||||
factory -> computeDimensionSpecCapabilities(
|
||||
dimensionSpec,
|
||||
factory.getColumnCapabilities(dimensionSpec.getDimension())
|
||||
),
|
||||
factory -> factory.makeSingleValueDimensionSelector(dimensionSpec),
|
||||
factory -> factory.makeMultiValueDimensionSelector(dimensionSpec),
|
||||
factory -> factory.makeValueSelector(dimensionSpec.getDimension()),
|
||||
factory -> factory.makeObjectSelector(dimensionSpec.getDimension()),
|
||||
processorFactory,
|
||||
selectorFactory
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the capabilities of selectors derived from a particular {@link DimensionSpec}.
|
||||
*
|
||||
* Will only return non-STRING types if the DimensionSpec passes through inputs unchanged. (i.e., it's a
|
||||
* {@link DefaultDimensionSpec}, or something that behaves like one.)
|
||||
*
|
||||
* @param dimensionSpec The dimensionSpec.
|
||||
* @param columnCapabilities Capabilities of the column that the dimensionSpec is reading, i.e.
|
||||
* {@link DimensionSpec#getDimension()}.
|
||||
*/
|
||||
@Nullable
|
||||
private static ColumnCapabilities computeDimensionSpecCapabilities(
|
||||
final DimensionSpec dimensionSpec,
|
||||
@Nullable final ColumnCapabilities columnCapabilities
|
||||
)
|
||||
{
|
||||
if (dimensionSpec.mustDecorate()) {
|
||||
// Decorating DimensionSpecs could do anything. We can't pass along any useful info other than the type.
|
||||
return new ColumnCapabilitiesImpl().setType(ValueType.STRING);
|
||||
} else if (dimensionSpec.getExtractionFn() != null) {
|
||||
// DimensionSpec is applying an extractionFn but *not* decorating. We have some insight into how the
|
||||
// extractionFn will behave, so let's use it.
|
||||
|
||||
return new ColumnCapabilitiesImpl()
|
||||
.setType(ValueType.STRING)
|
||||
.setDictionaryValuesSorted(dimensionSpec.getExtractionFn().preservesOrdering())
|
||||
.setDictionaryValuesUnique(dimensionSpec.getExtractionFn().getExtractionType()
|
||||
== ExtractionFn.ExtractionType.ONE_TO_ONE)
|
||||
.setHasMultipleValues(dimensionSpec.mustDecorate() || mayBeMultiValue(columnCapabilities));
|
||||
} else {
|
||||
// No transformation. Pass through underlying types.
|
||||
return columnCapabilities;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates "column processors", which are objects that wrap a single input column and provide some
|
||||
* functionality on top of it.
|
||||
|
@ -158,8 +246,6 @@ public class ColumnProcessors
|
|||
* called if the column type is long, float, double, or complex.
|
||||
* @param processorFactory object that encapsulates the knowledge about how to create processors
|
||||
* @param selectorFactory column selector factory used for creating the vector processor
|
||||
*
|
||||
* @see DimensionHandlerUtils#makeVectorProcessor the vectorized version
|
||||
*/
|
||||
private static <T> T makeProcessorInternal(
|
||||
final Function<ColumnSelectorFactory, ColumnCapabilities> inputCapabilitiesFn,
|
||||
|
@ -191,6 +277,71 @@ public class ColumnProcessors
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates "column processors", which are objects that wrap a single input column and provide some
|
||||
* functionality on top of it.
|
||||
*
|
||||
* @param inputCapabilitiesFn function that returns capabilities of the column being processed. The type provided
|
||||
* by these capabilities will be used to determine what kind of selector to create. If
|
||||
* this function returns null, then it is assumed that the column does not exist.
|
||||
* Note: this is different behavior from the non-vectorized version.
|
||||
* @param singleValueDimensionSelectorFn function that creates a singly-valued dimension selector for the column being
|
||||
* processed. Will be called if the column is singly-valued string.
|
||||
* @param multiValueDimensionSelectorFn function that creates a multi-valued dimension selector for the column being
|
||||
* processed. Will be called if the column is multi-valued string.
|
||||
* @param valueSelectorFn function that creates a value selector for the column being processed. Will be
|
||||
* called if the column type is long, float, or double.
|
||||
* @param objectSelectorFn function that creates an object selector for the column being processed. Will
|
||||
* be called if the column type is complex.
|
||||
* @param processorFactory object that encapsulates the knowledge about how to create processors
|
||||
* @param selectorFactory column selector factory used for creating the vector processor
|
||||
*/
|
||||
private static <T> T makeVectorProcessorInternal(
|
||||
final Function<VectorColumnSelectorFactory, ColumnCapabilities> inputCapabilitiesFn,
|
||||
final Function<VectorColumnSelectorFactory, SingleValueDimensionVectorSelector> singleValueDimensionSelectorFn,
|
||||
final Function<VectorColumnSelectorFactory, MultiValueDimensionVectorSelector> multiValueDimensionSelectorFn,
|
||||
final Function<VectorColumnSelectorFactory, VectorValueSelector> valueSelectorFn,
|
||||
final Function<VectorColumnSelectorFactory, VectorObjectSelector> objectSelectorFn,
|
||||
final VectorColumnProcessorFactory<T> processorFactory,
|
||||
final VectorColumnSelectorFactory selectorFactory
|
||||
)
|
||||
{
|
||||
final ColumnCapabilities capabilities = inputCapabilitiesFn.apply(selectorFactory);
|
||||
|
||||
if (capabilities == null) {
|
||||
// Column does not exist.
|
||||
return processorFactory.makeSingleValueDimensionProcessor(
|
||||
NIL_COLUMN_CAPABILITIES,
|
||||
NilVectorSelector.create(selectorFactory.getReadableVectorInspector())
|
||||
);
|
||||
}
|
||||
|
||||
switch (capabilities.getType()) {
|
||||
case STRING:
|
||||
if (mayBeMultiValue(capabilities)) {
|
||||
return processorFactory.makeMultiValueDimensionProcessor(
|
||||
capabilities,
|
||||
multiValueDimensionSelectorFn.apply(selectorFactory)
|
||||
);
|
||||
} else {
|
||||
return processorFactory.makeSingleValueDimensionProcessor(
|
||||
capabilities,
|
||||
singleValueDimensionSelectorFn.apply(selectorFactory)
|
||||
);
|
||||
}
|
||||
case LONG:
|
||||
return processorFactory.makeLongProcessor(capabilities, valueSelectorFn.apply(selectorFactory));
|
||||
case FLOAT:
|
||||
return processorFactory.makeFloatProcessor(capabilities, valueSelectorFn.apply(selectorFactory));
|
||||
case DOUBLE:
|
||||
return processorFactory.makeDoubleProcessor(capabilities, valueSelectorFn.apply(selectorFactory));
|
||||
case COMPLEX:
|
||||
return processorFactory.makeObjectProcessor(capabilities, objectSelectorFn.apply(selectorFactory));
|
||||
default:
|
||||
throw new ISE("Unsupported type[%s]", capabilities.getType());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if a given set of capabilities might indicate an underlying multi-value column. Errs on the side
|
||||
* of returning true if unknown; i.e. if this returns false, there are _definitely not_ mul.
|
||||
|
|
|
@ -32,13 +32,11 @@ import org.apache.druid.java.util.common.parsers.ParseException;
|
|||
import org.apache.druid.query.ColumnSelectorPlus;
|
||||
import org.apache.druid.query.dimension.ColumnSelectorStrategy;
|
||||
import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
|
||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||
import org.apache.druid.query.dimension.DimensionSpec;
|
||||
import org.apache.druid.query.extraction.ExtractionFn;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.math.BigDecimal;
|
||||
|
@ -105,7 +103,12 @@ public final class DimensionHandlerUtils
|
|||
if (!capabilities.isDictionaryEncoded().isTrue()) {
|
||||
throw new IAE("String column must have dictionary encoding.");
|
||||
}
|
||||
return new StringDimensionHandler(dimensionName, multiValueHandling, capabilities.hasBitmapIndexes(), capabilities.hasSpatialIndexes());
|
||||
return new StringDimensionHandler(
|
||||
dimensionName,
|
||||
multiValueHandling,
|
||||
capabilities.hasBitmapIndexes(),
|
||||
capabilities.hasSpatialIndexes()
|
||||
);
|
||||
}
|
||||
|
||||
if (capabilities.getType() == ValueType.LONG) {
|
||||
|
@ -146,10 +149,10 @@ public final class DimensionHandlerUtils
|
|||
* {@link #createColumnSelectorPluses(ColumnSelectorStrategyFactory, List, ColumnSelectorFactory)} with a singleton
|
||||
* list of dimensionSpecs and then retrieving the only element in the returned array.
|
||||
*
|
||||
* @param <Strategy> The strategy type created by the provided strategy factory.
|
||||
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
|
||||
* @param dimensionSpec column to generate a ColumnSelectorPlus object for
|
||||
* @param cursor Used to create value selectors for columns.
|
||||
* @param <Strategy> The strategy type created by the provided strategy factory.
|
||||
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
|
||||
* @param dimensionSpec column to generate a ColumnSelectorPlus object for
|
||||
* @param cursor Used to create value selectors for columns.
|
||||
*
|
||||
* @return A ColumnSelectorPlus object
|
||||
*
|
||||
|
@ -175,10 +178,10 @@ public final class DimensionHandlerUtils
|
|||
* A caller should define a strategy factory that provides an interface for type-specific operations
|
||||
* in a query engine. See GroupByStrategyFactory for a reference.
|
||||
*
|
||||
* @param <Strategy> The strategy type created by the provided strategy factory.
|
||||
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
|
||||
* @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for
|
||||
* @param columnSelectorFactory Used to create value selectors for columns.
|
||||
* @param <Strategy> The strategy type created by the provided strategy factory.
|
||||
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
|
||||
* @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for
|
||||
* @param columnSelectorFactory Used to create value selectors for columns.
|
||||
*
|
||||
* @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs
|
||||
*
|
||||
|
@ -287,96 +290,6 @@ public final class DimensionHandlerUtils
|
|||
return strategyFactory.makeColumnSelectorStrategy(capabilities, selector);
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent to calling makeVectorProcessor(DefaultDimensionSpec.of(column), strategyFactory, selectorFactory).
|
||||
*
|
||||
* @see #makeVectorProcessor(DimensionSpec, VectorColumnProcessorFactory, VectorColumnSelectorFactory)
|
||||
* @see ColumnProcessors#makeProcessor the non-vectorized version
|
||||
*/
|
||||
public static <T> T makeVectorProcessor(
|
||||
final String column,
|
||||
final VectorColumnProcessorFactory<T> strategyFactory,
|
||||
final VectorColumnSelectorFactory selectorFactory
|
||||
)
|
||||
{
|
||||
return makeVectorProcessor(DefaultDimensionSpec.of(column), strategyFactory, selectorFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates "vector processors", which are objects that wrap a single vectorized input column and provide some
|
||||
* functionality on top of it. Used by things like query engines and filter matchers.
|
||||
*
|
||||
* Supports the basic types STRING, LONG, DOUBLE, and FLOAT.
|
||||
*
|
||||
* @param dimensionSpec dimensionSpec for the input to the processor
|
||||
* @param strategyFactory object that encapsulates the knowledge about how to create processors
|
||||
* @param selectorFactory column selector factory used for creating the vector processor
|
||||
*
|
||||
* @see ColumnProcessors#makeProcessor the non-vectorized version
|
||||
*/
|
||||
public static <T> T makeVectorProcessor(
|
||||
final DimensionSpec dimensionSpec,
|
||||
final VectorColumnProcessorFactory<T> strategyFactory,
|
||||
final VectorColumnSelectorFactory selectorFactory
|
||||
)
|
||||
{
|
||||
final ColumnCapabilities originalCapabilities =
|
||||
selectorFactory.getColumnCapabilities(dimensionSpec.getDimension());
|
||||
|
||||
final ColumnCapabilities effectiveCapabilites = getEffectiveCapabilities(
|
||||
dimensionSpec,
|
||||
originalCapabilities
|
||||
);
|
||||
|
||||
final ValueType type = effectiveCapabilites.getType();
|
||||
|
||||
// vector selectors should never have null column capabilities, these signify a non-existent column, and complex
|
||||
// columns should never be treated as a multi-value column, so always use single value string processor
|
||||
final boolean forceSingleValue =
|
||||
originalCapabilities == null || ValueType.COMPLEX.equals(originalCapabilities.getType());
|
||||
|
||||
if (type == ValueType.STRING) {
|
||||
if (!forceSingleValue && effectiveCapabilites.hasMultipleValues().isMaybeTrue()) {
|
||||
return strategyFactory.makeMultiValueDimensionProcessor(
|
||||
effectiveCapabilites,
|
||||
selectorFactory.makeMultiValueDimensionSelector(dimensionSpec)
|
||||
);
|
||||
} else {
|
||||
return strategyFactory.makeSingleValueDimensionProcessor(
|
||||
effectiveCapabilites,
|
||||
selectorFactory.makeSingleValueDimensionSelector(dimensionSpec)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
Preconditions.checkState(
|
||||
dimensionSpec.getExtractionFn() == null && !dimensionSpec.mustDecorate(),
|
||||
"Uh oh, was about to try to make a value selector for type[%s] with a dimensionSpec of class[%s] that "
|
||||
+ "requires decoration. Possible bug.",
|
||||
type,
|
||||
dimensionSpec.getClass().getName()
|
||||
);
|
||||
|
||||
if (type == ValueType.LONG) {
|
||||
return strategyFactory.makeLongProcessor(
|
||||
effectiveCapabilites,
|
||||
selectorFactory.makeValueSelector(dimensionSpec.getDimension())
|
||||
);
|
||||
} else if (type == ValueType.FLOAT) {
|
||||
return strategyFactory.makeFloatProcessor(
|
||||
effectiveCapabilites,
|
||||
selectorFactory.makeValueSelector(dimensionSpec.getDimension())
|
||||
);
|
||||
} else if (type == ValueType.DOUBLE) {
|
||||
return strategyFactory.makeDoubleProcessor(
|
||||
effectiveCapabilites,
|
||||
selectorFactory.makeValueSelector(dimensionSpec.getDimension())
|
||||
);
|
||||
} else {
|
||||
throw new ISE("Unsupported type[%s]", effectiveCapabilites.getType());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public static String convertObjectToString(@Nullable Object valObj)
|
||||
{
|
||||
|
|
|
@ -122,16 +122,25 @@ public interface DimensionSelector extends ColumnValueSelector<Object>, Dimensio
|
|||
@Nullable
|
||||
default Object defaultGetObject()
|
||||
{
|
||||
IndexedInts row = getRow();
|
||||
return rowToObject(getRow(), this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a particular {@link IndexedInts} to an Object in a standard way, assuming each element in the IndexedInts
|
||||
* is a dictionary ID that can be resolved with the provided selector.
|
||||
*/
|
||||
@Nullable
|
||||
static Object rowToObject(IndexedInts row, DimensionDictionarySelector selector)
|
||||
{
|
||||
int rowSize = row.size();
|
||||
if (rowSize == 0) {
|
||||
return null;
|
||||
} else if (rowSize == 1) {
|
||||
return lookupName(row.get(0));
|
||||
return selector.lookupName(row.get(0));
|
||||
} else {
|
||||
final String[] strings = new String[rowSize];
|
||||
for (int i = 0; i < rowSize; i++) {
|
||||
strings[i] = lookupName(row.get(i));
|
||||
strings[i] = selector.lookupName(row.get(i));
|
||||
}
|
||||
return Arrays.asList(strings);
|
||||
}
|
||||
|
|
|
@ -22,34 +22,59 @@ package org.apache.druid.segment;
|
|||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.VectorObjectSelector;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
/**
|
||||
* Class that encapsulates knowledge about how to create vector column processors. Used by
|
||||
* {@link DimensionHandlerUtils#makeVectorProcessor}.
|
||||
* {@link ColumnProcessors#makeVectorProcessor}.
|
||||
*
|
||||
* Unlike {@link ColumnProcessorFactory}, this interface does not have a "defaultType" method. The default type is
|
||||
* always implicitly STRING. It also does not have a "makeComplexProcessor" method; instead, complex-typed columns
|
||||
* are fed into "makeSingleValueDimensionProcessor". This behavior may change in the future to better align
|
||||
* with {@link ColumnProcessorFactory}.
|
||||
* Column processors can be any type "T". The idea is that a ColumnProcessorFactory embodies the logic for wrapping
|
||||
* and processing selectors of various types, and so enables nice code design, where type-dependent code is not
|
||||
* sprinkled throughout.
|
||||
*
|
||||
* Unlike {@link ColumnProcessorFactory}, this interface does not have a "defaultType" method, because vector
|
||||
* column types are always known, so it isn't necessary.
|
||||
*
|
||||
* @see ColumnProcessorFactory the non-vectorized version
|
||||
*/
|
||||
public interface VectorColumnProcessorFactory<T>
|
||||
{
|
||||
/**
|
||||
* Called when {@link ColumnCapabilities#getType()} is STRING and the underlying column always has a single value
|
||||
* per row.
|
||||
*/
|
||||
T makeSingleValueDimensionProcessor(
|
||||
ColumnCapabilities capabilities,
|
||||
SingleValueDimensionVectorSelector selector
|
||||
);
|
||||
|
||||
/**
|
||||
* Called when {@link ColumnCapabilities#getType()} is STRING and the underlying column may have multiple values
|
||||
* per row.
|
||||
*/
|
||||
T makeMultiValueDimensionProcessor(
|
||||
ColumnCapabilities capabilities,
|
||||
MultiValueDimensionVectorSelector selector
|
||||
);
|
||||
|
||||
/**
|
||||
* Called when {@link ColumnCapabilities#getType()} is FLOAT.
|
||||
*/
|
||||
T makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector);
|
||||
|
||||
/**
|
||||
* Called when {@link ColumnCapabilities#getType()} is DOUBLE.
|
||||
*/
|
||||
T makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector);
|
||||
|
||||
/**
|
||||
* Called when {@link ColumnCapabilities#getType()} is LONG.
|
||||
*/
|
||||
T makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector);
|
||||
|
||||
/**
|
||||
* Called when {@link ColumnCapabilities#getType()} is COMPLEX.
|
||||
*/
|
||||
T makeObjectProcessor(@SuppressWarnings("unused") ColumnCapabilities capabilities, VectorObjectSelector selector);
|
||||
}
|
||||
|
|
|
@ -41,9 +41,9 @@ import org.apache.druid.query.filter.ValueMatcher;
|
|||
import org.apache.druid.query.filter.vector.VectorValueMatcher;
|
||||
import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
|
||||
import org.apache.druid.query.ordering.StringComparators;
|
||||
import org.apache.druid.segment.ColumnProcessors;
|
||||
import org.apache.druid.segment.ColumnSelector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.DimensionHandlerUtils;
|
||||
import org.apache.druid.segment.IntListUtils;
|
||||
import org.apache.druid.segment.column.BitmapIndex;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
@ -129,7 +129,7 @@ public class BoundFilter implements Filter
|
|||
@Override
|
||||
public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory)
|
||||
{
|
||||
return DimensionHandlerUtils.makeVectorProcessor(
|
||||
return ColumnProcessors.makeVectorProcessor(
|
||||
boundDimFilter.getDimension(),
|
||||
VectorValueMatcherColumnProcessorFactory.instance(),
|
||||
factory
|
||||
|
|
|
@ -36,15 +36,16 @@ import org.apache.druid.query.filter.FilterTuning;
|
|||
import org.apache.druid.query.filter.ValueMatcher;
|
||||
import org.apache.druid.query.filter.vector.VectorValueMatcher;
|
||||
import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
|
||||
import org.apache.druid.segment.ColumnProcessors;
|
||||
import org.apache.druid.segment.ColumnSelector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.DimensionHandlerUtils;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class DimensionPredicateFilter implements Filter
|
||||
{
|
||||
|
@ -98,7 +99,7 @@ public class DimensionPredicateFilter implements Filter
|
|||
@Override
|
||||
public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory)
|
||||
{
|
||||
return DimensionHandlerUtils.makeVectorProcessor(
|
||||
return ColumnProcessors.makeVectorProcessor(
|
||||
dimension,
|
||||
VectorValueMatcherColumnProcessorFactory.instance(),
|
||||
factory
|
||||
|
|
|
@ -35,9 +35,9 @@ import org.apache.druid.query.filter.LikeDimFilter;
|
|||
import org.apache.druid.query.filter.ValueMatcher;
|
||||
import org.apache.druid.query.filter.vector.VectorValueMatcher;
|
||||
import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
|
||||
import org.apache.druid.segment.ColumnProcessors;
|
||||
import org.apache.druid.segment.ColumnSelector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.DimensionHandlerUtils;
|
||||
import org.apache.druid.segment.column.BitmapIndex;
|
||||
import org.apache.druid.segment.data.CloseableIndexed;
|
||||
import org.apache.druid.segment.data.Indexed;
|
||||
|
@ -91,7 +91,7 @@ public class LikeFilter implements Filter
|
|||
@Override
|
||||
public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory)
|
||||
{
|
||||
return DimensionHandlerUtils.makeVectorProcessor(
|
||||
return ColumnProcessors.makeVectorProcessor(
|
||||
dimension,
|
||||
VectorValueMatcherColumnProcessorFactory.instance(),
|
||||
factory
|
||||
|
|
|
@ -29,9 +29,9 @@ import org.apache.druid.query.filter.FilterTuning;
|
|||
import org.apache.druid.query.filter.ValueMatcher;
|
||||
import org.apache.druid.query.filter.vector.VectorValueMatcher;
|
||||
import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
|
||||
import org.apache.druid.segment.ColumnProcessors;
|
||||
import org.apache.druid.segment.ColumnSelector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.DimensionHandlerUtils;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -87,7 +87,7 @@ public class SelectorFilter implements Filter
|
|||
@Override
|
||||
public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory)
|
||||
{
|
||||
return DimensionHandlerUtils.makeVectorProcessor(
|
||||
return ColumnProcessors.makeVectorProcessor(
|
||||
dimension,
|
||||
VectorValueMatcherColumnProcessorFactory.instance(),
|
||||
factory
|
||||
|
|
|
@ -26,9 +26,13 @@ import org.apache.druid.segment.column.ColumnCapabilities;
|
|||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* A class that comes from {@link VectorCursor#getColumnSelectorFactory()} and is used to create vector selectors.
|
||||
*
|
||||
* If you need to write code that adapts to different input types, you should write a
|
||||
* {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the
|
||||
* {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this class.
|
||||
*
|
||||
* @see org.apache.druid.segment.ColumnSelectorFactory, the non-vectorized version.
|
||||
* @see org.apache.druid.segment.ColumnSelectorFactory the non-vectorized version.
|
||||
*/
|
||||
public interface VectorColumnSelectorFactory extends ColumnInspector
|
||||
{
|
||||
|
@ -48,22 +52,43 @@ public interface VectorColumnSelectorFactory extends ColumnInspector
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns a string-typed, single-value-per-row column selector.
|
||||
* Returns a string-typed, single-value-per-row column selector. Should only be called on columns where
|
||||
* {@link #getColumnCapabilities} indicates they return STRING, or on nonexistent columns.
|
||||
*
|
||||
* If you need to write code that adapts to different input types, you should write a
|
||||
* {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the
|
||||
* {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method.
|
||||
*/
|
||||
SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec);
|
||||
|
||||
/**
|
||||
* Returns a string-typed, multi-value-per-row column selector.
|
||||
* Returns a string-typed, multi-value-per-row column selector. Should only be called on columns where
|
||||
* {@link #getColumnCapabilities} indicates they return STRING. Unlike {@link #makeSingleValueDimensionSelector},
|
||||
* this should not be called on nonexistent columns.
|
||||
*
|
||||
* If you need to write code that adapts to different input types, you should write a
|
||||
* {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the
|
||||
* {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method.
|
||||
*/
|
||||
MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec);
|
||||
|
||||
/**
|
||||
* Returns a primitive column selector.
|
||||
* Returns a primitive column selector. Should only be called on columns where {@link #getColumnCapabilities}
|
||||
* indicates they return DOUBLE, FLOAT, or LONG, or on nonexistent columns.
|
||||
*
|
||||
* If you need to write code that adapts to different input types, you should write a
|
||||
* {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the
|
||||
* {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method.
|
||||
*/
|
||||
VectorValueSelector makeValueSelector(String column);
|
||||
|
||||
/**
|
||||
* Returns an object selector, useful for complex columns.
|
||||
* Returns an object selector. Should only be called on columns where {@link #getColumnCapabilities} indicates that
|
||||
* they return STRING or COMPLEX, or on nonexistent columns.
|
||||
*
|
||||
* If you need to write code that adapts to different input types, you should write a
|
||||
* {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the
|
||||
* {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method.
|
||||
*/
|
||||
VectorObjectSelector makeObjectSelector(String column);
|
||||
|
||||
|
|
|
@ -326,6 +326,21 @@ public class AggregationTestHelper implements Closeable
|
|||
return runQueryOnSegments(Collections.singletonList(segmentDir), queryJson);
|
||||
}
|
||||
|
||||
public <T> Sequence<T> createIndexAndRunQueryOnSegment(
|
||||
File inputDataFile,
|
||||
String parserJson,
|
||||
String aggregators,
|
||||
long minTimestamp,
|
||||
Granularity gran,
|
||||
int maxRowCount,
|
||||
Query<T> query
|
||||
) throws Exception
|
||||
{
|
||||
File segmentDir = tempFolder.newFolder();
|
||||
createIndex(inputDataFile, parserJson, aggregators, segmentDir, minTimestamp, gran, maxRowCount, true);
|
||||
return runQueryOnSegments(Collections.singletonList(segmentDir), query);
|
||||
}
|
||||
|
||||
public <T> Sequence<T> createIndexAndRunQueryOnSegment(
|
||||
File inputDataFile,
|
||||
String parserJson,
|
||||
|
|
|
@ -8856,6 +8856,36 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
|
|||
TestHelper.assertExpectedObjects(expectedResults, results, "long");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByComplexColumn()
|
||||
{
|
||||
GroupByQuery query = makeQueryBuilder()
|
||||
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
|
||||
.setDimensions(new DefaultDimensionSpec("quality_uniques", "quality_uniques"))
|
||||
.setDimFilter(new SelectorDimFilter("quality_uniques", null, null))
|
||||
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
|
||||
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
|
||||
.build();
|
||||
|
||||
Assert.assertEquals(Functions.<Sequence<ResultRow>>identity(), query.getLimitSpec().build(query));
|
||||
|
||||
List<ResultRow> expectedResults = Collections.singletonList(
|
||||
makeRow(
|
||||
query,
|
||||
"2011-04-01",
|
||||
"quality_uniques",
|
||||
null,
|
||||
"rows",
|
||||
26L,
|
||||
"idx",
|
||||
12446L
|
||||
)
|
||||
);
|
||||
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "long");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByLongColumnDescending()
|
||||
{
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.druid.query.groupby.GroupByQuery;
|
|||
import org.apache.druid.query.groupby.GroupByQueryConfig;
|
||||
import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
|
||||
import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine.VectorGroupByEngineIterator;
|
||||
import org.apache.druid.segment.DimensionHandlerUtils;
|
||||
import org.apache.druid.segment.ColumnProcessors;
|
||||
import org.apache.druid.segment.QueryableIndexStorageAdapter;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
import org.apache.druid.segment.TestIndex;
|
||||
|
@ -72,7 +72,7 @@ public class VectorGroupByEngineIteratorTest extends InitializedNullHandlingTest
|
|||
);
|
||||
final List<GroupByVectorColumnSelector> dimensions = query.getDimensions().stream().map(
|
||||
dimensionSpec ->
|
||||
DimensionHandlerUtils.makeVectorProcessor(
|
||||
ColumnProcessors.makeVectorProcessor(
|
||||
dimensionSpec,
|
||||
GroupByVectorColumnProcessorFactory.instance(),
|
||||
cursor.getColumnSelectorFactory()
|
||||
|
|
Loading…
Reference in New Issue