ANY Aggregator should not skip null values implementation (#9317)

* ANY Aggregator should not skip null values implementation

* add tests

* add more tests

* Update documentation

* add more tests

* address review comments

* optimize StringAnyBufferAggregator

* fix failing tests

* address pr comments
This commit is contained in:
Maytas Monsereenusorn 2020-02-12 14:01:41 -08:00 committed by GitHub
parent c3ebb5eb65
commit c30579e47b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1872 additions and 316 deletions

View File

@ -238,7 +238,7 @@ Note that queries with first/last aggregators on a segment created with rollup e
(Double/Float/Long/String) ANY aggregator cannot be used in ingestion spec, and should only be specified as part of queries.
If `druid.generic.useDefaultValueForNull=true` aggregation can returns the default value for null and does not prefer "non-null" values over the default value for null. If `druid.generic.useDefaultValueForNull=false`, then aggregation will returns any non-null value.
Returns any value including null. This aggregator can simplify and optimize the performance by returning the first encountered value (including null)
#### `doubleAny` aggregator

View File

@ -205,7 +205,7 @@ Only the COUNT aggregation can accept DISTINCT.
|`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|
|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
|`ANY_VALUE(expr)`|Returns any value of `expr`, which must be numeric. If `druid.generic.useDefaultValueForNull=true` this can return the default value for null and does not prefer "non-null" values over the default value for null. If `druid.generic.useDefaultValueForNull=false`, then this will return any non-null value of `expr`|
|`ANY_VALUE(expr)`|Returns any value of `expr` including null. `expr` must be numeric. This aggregator can simplify and optimize the performance by returning the first encountered value (including null)|
|`ANY_VALUE(expr, maxBytesPerString)`|Like `ANY_VALUE(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|

View File

@ -19,44 +19,31 @@
package org.apache.druid.query.aggregation.any;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
/**
* This Aggregator is created by the {@link DoubleAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class DoubleAnyAggregator implements Aggregator
{
private final BaseDoubleColumnValueSelector valueSelector;
import javax.annotation.Nullable;
public class DoubleAnyAggregator extends NumericAnyAggregator<BaseDoubleColumnValueSelector>
{
private double foundValue;
private boolean isFound;
public DoubleAnyAggregator(BaseDoubleColumnValueSelector valueSelector)
{
this.valueSelector = valueSelector;
super(valueSelector);
this.foundValue = 0;
this.isFound = false;
}
@Override
public void aggregate()
void setFoundValue()
{
if (!isFound) {
foundValue = valueSelector.getDouble();
isFound = true;
}
foundValue = valueSelector.getDouble();
}
@Override
@Nullable
public Object get()
{
return foundValue;
return isNull ? null : foundValue;
}
@Override
@ -76,10 +63,4 @@ public class DoubleAnyAggregator implements Aggregator
{
return foundValue;
}
@Override
public void close()
{
// no-op
}
}

View File

@ -19,79 +19,163 @@
package org.apache.druid.query.aggregation.any;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.math.expr.ExprMacroTable;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SimpleDoubleAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
public class DoubleAnyAggregatorFactory extends SimpleDoubleAggregatorFactory
public class DoubleAnyAggregatorFactory extends AggregatorFactory
{
private static final Comparator<Double> VALUE_COMPARATOR = Comparator.nullsFirst(Double::compare);
private static final Aggregator NIL_AGGREGATOR = new DoubleAnyAggregator(
NilColumnValueSelector.instance()
)
{
@Override
public void aggregate()
{
// no-op
}
};
private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new DoubleAnyBufferAggregator(
NilColumnValueSelector.instance()
)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
// no-op
}
};
private final String fieldName;
private final String name;
private final boolean storeDoubleAsFloat;
@JsonCreator
public DoubleAnyAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("expression") @Nullable String expression,
@JacksonInject ExprMacroTable macroTable
@JsonProperty("fieldName") final String fieldName
)
{
super(macroTable, name, fieldName, expression);
}
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
public DoubleAnyAggregatorFactory(String name, String fieldName)
{
this(name, fieldName, null, ExprMacroTable.nil());
this.name = name;
this.fieldName = fieldName;
this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat();
}
@Override
protected double nullValue()
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return Double.NaN;
final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName);
if (valueSelector instanceof NilColumnValueSelector) {
return NIL_AGGREGATOR;
} else {
return new DoubleAnyAggregator(
valueSelector
);
}
}
@Override
protected Aggregator buildAggregator(BaseDoubleColumnValueSelector selector)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleAnyAggregator(selector);
final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName);
if (valueSelector instanceof NilColumnValueSelector) {
return NIL_BUFFER_AGGREGATOR;
} else {
return new DoubleAnyBufferAggregator(
valueSelector
);
}
}
@Override
protected BufferAggregator buildBufferAggregator(BaseDoubleColumnValueSelector selector)
public Comparator getComparator()
{
return new DoubleAnyBufferAggregator(selector);
return VALUE_COMPARATOR;
}
@Override
@Nullable
public Object combine(@Nullable Object lhs, @Nullable Object rhs)
{
if (lhs != null) {
return lhs;
} else {
return rhs;
}
return lhs;
}
@Override
public AggregateCombiner makeAggregateCombiner()
{
throw new UOE("DoubleAnyAggregatorFactory is not supported during ingestion for rollup");
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new DoubleAnyAggregatorFactory(name, name, null, macroTable);
return new DoubleAnyAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(new DoubleAnyAggregatorFactory(fieldName, fieldName, expression, macroTable));
return Collections.singletonList(new DoubleAnyAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{
// handle "NaN" / "Infinity" values serialized as strings in JSON
if (object instanceof String) {
return Double.parseDouble((String) object);
}
return object;
}
@Override
@Nullable
public Object finalizeComputation(@Nullable Object object)
{
return object;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
public List<String> requiredFields()
{
return Collections.singletonList(fieldName);
}
@Override
@ -99,23 +183,48 @@ public class DoubleAnyAggregatorFactory extends SimpleDoubleAggregatorFactory
{
return new CacheKeyBuilder(AggregatorUtil.DOUBLE_ANY_CACHE_TYPE_ID)
.appendString(fieldName)
.appendString(expression)
.build();
}
@Override
public String getTypeName()
{
return storeDoubleAsFloat ? "float" : "double";
}
@Override
public int getMaxIntermediateSize()
{
return Double.BYTES + Byte.BYTES;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DoubleAnyAggregatorFactory that = (DoubleAnyAggregatorFactory) o;
return fieldName.equals(that.fieldName) && name.equals(that.name);
}
@Override
public int hashCode()
{
return Objects.hash(fieldName, name);
}
@Override
public String toString()
{
return "DoubleAnyAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", expression='" + expression + '\'' +
", name='" + name + '\'' +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}

View File

@ -19,81 +19,55 @@
package org.apache.druid.query.aggregation.any;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
/**
* This Aggregator is created by the {@link DoubleAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class DoubleAnyBufferAggregator implements BufferAggregator
public class DoubleAnyBufferAggregator extends NumericAnyBufferAggregator<BaseDoubleColumnValueSelector>
{
private static final byte BYTE_FLAG_IS_NOT_SET = 0;
private static final byte BYTE_FLAG_IS_SET = 1;
private final BaseDoubleColumnValueSelector valueSelector;
public DoubleAnyBufferAggregator(BaseDoubleColumnValueSelector valueSelector)
public DoubleAnyBufferAggregator(
BaseDoubleColumnValueSelector valueSelector
)
{
this.valueSelector = valueSelector;
super(valueSelector);
}
@Override
public void init(ByteBuffer buf, int position)
void initValue(ByteBuffer buf, int position)
{
buf.put(position, BYTE_FLAG_IS_NOT_SET);
buf.putDouble(position + Byte.BYTES, NullHandling.ZERO_DOUBLE);
buf.putDouble(position + FOUND_VALUE_OFFSET, 0);
}
@Override
public void aggregate(ByteBuffer buf, int position)
void putValue(ByteBuffer buf, int position)
{
if (buf.get(position) == BYTE_FLAG_IS_NOT_SET) {
buf.put(position, BYTE_FLAG_IS_SET);
buf.putDouble(position + Byte.BYTES, valueSelector.getDouble());
}
buf.putDouble(position + FOUND_VALUE_OFFSET, valueSelector.getDouble());
}
@Override
@Nullable
public Object get(ByteBuffer buf, int position)
{
return buf.getDouble(position + Byte.BYTES);
final boolean isNull = isValueNull(buf, position);
return isNull ? null : buf.getDouble(position + FOUND_VALUE_OFFSET);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getDouble(position + Byte.BYTES);
return (float) buf.getDouble(position + FOUND_VALUE_OFFSET);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position + Byte.BYTES);
return (long) buf.getDouble(position + FOUND_VALUE_OFFSET);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return buf.getDouble(position + Byte.BYTES);
}
@Override
public void close()
{
// no-op
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("valueSelector", valueSelector);
return buf.getDouble(position + FOUND_VALUE_OFFSET);
}
}

View File

@ -19,44 +19,31 @@
package org.apache.druid.query.aggregation.any;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
/**
* This Aggregator is created by the {@link FloatAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class FloatAnyAggregator implements Aggregator
{
private final BaseFloatColumnValueSelector valueSelector;
import javax.annotation.Nullable;
public class FloatAnyAggregator extends NumericAnyAggregator<BaseFloatColumnValueSelector>
{
private float foundValue;
private boolean isFound;
public FloatAnyAggregator(BaseFloatColumnValueSelector valueSelector)
{
this.valueSelector = valueSelector;
super(valueSelector);
this.foundValue = 0;
this.isFound = false;
}
@Override
public void aggregate()
void setFoundValue()
{
if (!isFound) {
foundValue = valueSelector.getFloat();
isFound = true;
}
foundValue = valueSelector.getFloat();
}
@Override
@Nullable
public Object get()
{
return foundValue;
return isNull ? null : foundValue;
}
@Override
@ -76,10 +63,4 @@ public class FloatAnyAggregator implements Aggregator
{
return (double) foundValue;
}
@Override
public void close()
{
// no-op
}
}

View File

@ -19,79 +19,161 @@
package org.apache.druid.query.aggregation.any;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.math.expr.ExprMacroTable;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SimpleFloatAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.NilColumnValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
public class FloatAnyAggregatorFactory extends SimpleFloatAggregatorFactory
public class FloatAnyAggregatorFactory extends AggregatorFactory
{
private static final Comparator<Float> VALUE_COMPARATOR = Comparator.nullsFirst(Float::compare);
private static final Aggregator NIL_AGGREGATOR = new FloatAnyAggregator(
NilColumnValueSelector.instance()
)
{
@Override
public void aggregate()
{
// no-op
}
};
private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new FloatAnyBufferAggregator(
NilColumnValueSelector.instance()
)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
// no-op
}
};
private final String fieldName;
private final String name;
@JsonCreator
public FloatAnyAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("expression") @Nullable String expression,
@JacksonInject ExprMacroTable macroTable
@JsonProperty("fieldName") final String fieldName
)
{
super(macroTable, name, fieldName, expression);
}
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
public FloatAnyAggregatorFactory(String name, String fieldName)
{
this(name, fieldName, null, ExprMacroTable.nil());
this.name = name;
this.fieldName = fieldName;
}
@Override
protected float nullValue()
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return Float.NaN;
final BaseFloatColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName);
if (valueSelector instanceof NilColumnValueSelector) {
return NIL_AGGREGATOR;
} else {
return new FloatAnyAggregator(
valueSelector
);
}
}
@Override
protected Aggregator buildAggregator(BaseFloatColumnValueSelector selector)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new FloatAnyAggregator(selector);
final BaseFloatColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName);
if (valueSelector instanceof NilColumnValueSelector) {
return NIL_BUFFER_AGGREGATOR;
} else {
return new FloatAnyBufferAggregator(
valueSelector
);
}
}
@Override
protected BufferAggregator buildBufferAggregator(BaseFloatColumnValueSelector selector)
public Comparator getComparator()
{
return new FloatAnyBufferAggregator(selector);
return VALUE_COMPARATOR;
}
@Override
@Nullable
public Object combine(@Nullable Object lhs, @Nullable Object rhs)
{
if (lhs != null) {
return lhs;
} else {
return rhs;
}
return lhs;
}
@Override
public AggregateCombiner makeAggregateCombiner()
{
throw new UOE("FloatAnyAggregatorFactory is not supported during ingestion for rollup");
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new FloatAnyAggregatorFactory(name, name, null, macroTable);
return new FloatAnyAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(new FloatAnyAggregatorFactory(fieldName, fieldName, expression, macroTable));
return Collections.singletonList(new FloatAnyAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{
// handle "NaN" / "Infinity" values serialized as strings in JSON
if (object instanceof String) {
return Float.parseFloat((String) object);
}
return object;
}
@Override
@Nullable
public Object finalizeComputation(@Nullable Object object)
{
return object;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
public List<String> requiredFields()
{
return Collections.singletonList(fieldName);
}
@Override
@ -99,23 +181,48 @@ public class FloatAnyAggregatorFactory extends SimpleFloatAggregatorFactory
{
return new CacheKeyBuilder(AggregatorUtil.FLOAT_ANY_CACHE_TYPE_ID)
.appendString(fieldName)
.appendString(expression)
.build();
}
@Override
public String getTypeName()
{
return "float";
}
@Override
public int getMaxIntermediateSize()
{
return Float.BYTES + Byte.BYTES;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FloatAnyAggregatorFactory that = (FloatAnyAggregatorFactory) o;
return fieldName.equals(that.fieldName) && name.equals(that.name);
}
@Override
public int hashCode()
{
return Objects.hash(fieldName, name);
}
@Override
public String toString()
{
return "FloatAnyAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", expression='" + expression + '\'' +
", name='" + name + '\'' +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}

View File

@ -19,81 +19,57 @@
package org.apache.druid.query.aggregation.any;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
/**
* This Aggregator is created by the {@link FloatAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class FloatAnyBufferAggregator implements BufferAggregator
public class FloatAnyBufferAggregator extends NumericAnyBufferAggregator<BaseFloatColumnValueSelector>
{
private static final byte BYTE_FLAG_IS_NOT_SET = 0;
private static final byte BYTE_FLAG_IS_SET = 1;
private static final float NULL_VALUE = 0;
private final BaseFloatColumnValueSelector valueSelector;
public FloatAnyBufferAggregator(BaseFloatColumnValueSelector valueSelector)
public FloatAnyBufferAggregator(
BaseFloatColumnValueSelector valueSelector
)
{
this.valueSelector = valueSelector;
super(valueSelector);
}
@Override
void initValue(ByteBuffer buf, int position)
{
buf.putFloat(position + FOUND_VALUE_OFFSET, 0);
}
@Override
public void init(ByteBuffer buf, int position)
void putValue(ByteBuffer buf, int position)
{
buf.put(position, BYTE_FLAG_IS_NOT_SET);
buf.putFloat(position + Byte.BYTES, NULL_VALUE);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
if (buf.get(position) == BYTE_FLAG_IS_NOT_SET) {
buf.put(position, BYTE_FLAG_IS_SET);
buf.putFloat(position + Byte.BYTES, valueSelector.getFloat());
}
buf.putFloat(position + FOUND_VALUE_OFFSET, valueSelector.getFloat());
}
@Override
@Nullable
public Object get(ByteBuffer buf, int position)
{
return buf.getFloat(position + Byte.BYTES);
final boolean isNull = isValueNull(buf, position);
return isNull ? null : buf.getFloat(position + FOUND_VALUE_OFFSET);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return buf.getFloat(position + Byte.BYTES);
return buf.getFloat(position + FOUND_VALUE_OFFSET);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getFloat(position + Byte.BYTES);
return (long) buf.getFloat(position + FOUND_VALUE_OFFSET);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return (double) buf.getFloat(position + Byte.BYTES);
return (double) buf.getFloat(position + FOUND_VALUE_OFFSET);
}
@Override
public void close()
{
// no-op
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("valueSelector", valueSelector);
}
}

View File

@ -19,44 +19,31 @@
package org.apache.druid.query.aggregation.any;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.segment.BaseLongColumnValueSelector;
/**
* This Aggregator is created by the {@link LongAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class LongAnyAggregator implements Aggregator
{
private final BaseLongColumnValueSelector valueSelector;
import javax.annotation.Nullable;
public class LongAnyAggregator extends NumericAnyAggregator<BaseLongColumnValueSelector>
{
private long foundValue;
private boolean isFound;
public LongAnyAggregator(BaseLongColumnValueSelector valueSelector)
{
this.valueSelector = valueSelector;
super(valueSelector);
this.foundValue = 0;
this.isFound = false;
}
@Override
public void aggregate()
void setFoundValue()
{
if (!isFound) {
foundValue = valueSelector.getLong();
isFound = true;
}
foundValue = valueSelector.getLong();
}
@Override
@Nullable
public Object get()
{
return foundValue;
return isNull ? null : foundValue;
}
@Override
@ -76,10 +63,4 @@ public class LongAnyAggregator implements Aggregator
{
return (double) foundValue;
}
@Override
public void close()
{
// no-op
}
}

View File

@ -19,79 +19,159 @@
package org.apache.druid.query.aggregation.any;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.math.expr.ExprMacroTable;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SimpleLongAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.NilColumnValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
public class LongAnyAggregatorFactory extends SimpleLongAggregatorFactory
public class LongAnyAggregatorFactory extends AggregatorFactory
{
private static final Comparator<Long> VALUE_COMPARATOR = Comparator.nullsFirst(Long::compare);
private static final Aggregator NIL_AGGREGATOR = new LongAnyAggregator(
NilColumnValueSelector.instance()
)
{
@Override
public void aggregate()
{
// no-op
}
};
private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new LongAnyBufferAggregator(
NilColumnValueSelector.instance()
)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
// no-op
}
};
private final String fieldName;
private final String name;
@JsonCreator
public LongAnyAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("expression") @Nullable String expression,
@JacksonInject ExprMacroTable macroTable
@JsonProperty("fieldName") final String fieldName
)
{
super(macroTable, name, fieldName, expression);
}
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
public LongAnyAggregatorFactory(String name, String fieldName)
{
this(name, fieldName, null, ExprMacroTable.nil());
this.name = name;
this.fieldName = fieldName;
}
@Override
protected long nullValue()
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return 0;
final BaseLongColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName);
if (valueSelector instanceof NilColumnValueSelector) {
return NIL_AGGREGATOR;
} else {
return new LongAnyAggregator(
valueSelector
);
}
}
@Override
protected Aggregator buildAggregator(BaseLongColumnValueSelector selector)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new LongAnyAggregator(selector);
final BaseLongColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName);
if (valueSelector instanceof NilColumnValueSelector) {
return NIL_BUFFER_AGGREGATOR;
} else {
return new LongAnyBufferAggregator(
valueSelector
);
}
}
@Override
protected BufferAggregator buildBufferAggregator(BaseLongColumnValueSelector selector)
public Comparator getComparator()
{
return new LongAnyBufferAggregator(selector);
return VALUE_COMPARATOR;
}
@Override
@Nullable
public Object combine(@Nullable Object lhs, @Nullable Object rhs)
{
if (lhs != null) {
return lhs;
} else {
return rhs;
}
return lhs;
}
@Override
public AggregateCombiner makeAggregateCombiner()
{
throw new UOE("LongAnyAggregatorFactory is not supported during ingestion for rollup");
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new LongAnyAggregatorFactory(name, name, null, macroTable);
return new LongAnyAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(new LongAnyAggregatorFactory(fieldName, fieldName, expression, macroTable));
return Collections.singletonList(new LongAnyAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{
// handle "NaN" / "Infinity" values serialized as strings in JSON
if (object instanceof String) {
return Float.parseFloat((String) object);
}
return object;
}
@Override
@Nullable
public Object finalizeComputation(@Nullable Object object)
{
return object;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
public List<String> requiredFields()
{
return Collections.singletonList(fieldName);
}
@Override
@ -99,23 +179,51 @@ public class LongAnyAggregatorFactory extends SimpleLongAggregatorFactory
{
return new CacheKeyBuilder(AggregatorUtil.LONG_ANY_CACHE_TYPE_ID)
.appendString(fieldName)
.appendString(expression)
.build();
}
@Override
public String getTypeName()
{
return "long";
}
@Override
public int getMaxIntermediateSize()
{
return Long.BYTES + Byte.BYTES;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LongAnyAggregatorFactory that = (LongAnyAggregatorFactory) o;
return fieldName.equals(that.fieldName) && name.equals(that.name);
}
@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
}
@Override
public String toString()
{
return "LongAnyAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", expression='" + expression + '\'' +
", name='" + name + '\'' +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}

View File

@ -19,81 +19,55 @@
package org.apache.druid.query.aggregation.any;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
/**
* This Aggregator is created by the {@link LongAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class LongAnyBufferAggregator implements BufferAggregator
public class LongAnyBufferAggregator extends NumericAnyBufferAggregator<BaseLongColumnValueSelector>
{
private static final byte BYTE_FLAG_IS_NOT_SET = 0;
private static final byte BYTE_FLAG_IS_SET = 1;
private static final long NULL_VALUE = 0;
private final BaseLongColumnValueSelector valueSelector;
public LongAnyBufferAggregator(BaseLongColumnValueSelector valueSelector)
public LongAnyBufferAggregator(
BaseLongColumnValueSelector valueSelector
)
{
this.valueSelector = valueSelector;
super(valueSelector);
}
@Override
public void init(ByteBuffer buf, int position)
void initValue(ByteBuffer buf, int position)
{
buf.put(position, BYTE_FLAG_IS_NOT_SET);
buf.putLong(position + Byte.BYTES, NULL_VALUE);
buf.putLong(position + FOUND_VALUE_OFFSET, 0);
}
@Override
public void aggregate(ByteBuffer buf, int position)
void putValue(ByteBuffer buf, int position)
{
if (buf.get(position) == BYTE_FLAG_IS_NOT_SET) {
buf.put(position, BYTE_FLAG_IS_SET);
buf.putLong(position + Byte.BYTES, valueSelector.getLong());
}
buf.putLong(position + FOUND_VALUE_OFFSET, valueSelector.getLong());
}
@Override
@Nullable
public Object get(ByteBuffer buf, int position)
{
return buf.getLong(position + Byte.BYTES);
final boolean isNull = isValueNull(buf, position);
return isNull ? null : buf.getLong(position + FOUND_VALUE_OFFSET);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position + Byte.BYTES);
return (float) buf.getLong(position + FOUND_VALUE_OFFSET);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return (double) buf.getLong(position + Byte.BYTES);
return (double) buf.getLong(position + FOUND_VALUE_OFFSET);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position + Byte.BYTES);
}
@Override
public void close()
{
// no-op
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("valueSelector", valueSelector);
return buf.getLong(position + FOUND_VALUE_OFFSET);
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseNullableColumnValueSelector;
/**
* Base type for on heap 'any' aggregator for primitive numeric column selectors
*/
public abstract class NumericAnyAggregator<TSelector extends BaseNullableColumnValueSelector> implements Aggregator
{
private final boolean useDefault = NullHandling.replaceWithDefault();
final TSelector valueSelector;
boolean isNull;
boolean isFound;
public NumericAnyAggregator(TSelector valueSelector)
{
this.valueSelector = valueSelector;
this.isNull = !useDefault;
this.isFound = false;
}
/**
* Store the found primitive value
*/
abstract void setFoundValue();
@Override
public void aggregate()
{
if (!isFound) {
if (useDefault || !valueSelector.isNull()) {
setFoundValue();
isNull = false;
} else {
isNull = true;
}
isFound = true;
}
}
@Override
public void close()
{
// nothing to close
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseNullableColumnValueSelector;
import java.nio.ByteBuffer;
/**
* Base type for buffer based 'any' aggregator for primitive numeric column selectors
*/
public abstract class NumericAnyBufferAggregator<TSelector extends BaseNullableColumnValueSelector>
implements BufferAggregator
{
// Rightmost bit for is null check (0 for is null and 1 for not null)
// Second rightmost bit for is found check (0 for not found and 1 for found)
private static final byte BYTE_FLAG_FOUND_MASK = 0x02;
private static final byte BYTE_FLAG_NULL_MASK = 0x01;
static final int FOUND_VALUE_OFFSET = Byte.BYTES;
private final boolean useDefault = NullHandling.replaceWithDefault();
final TSelector valueSelector;
public NumericAnyBufferAggregator(TSelector valueSelector)
{
this.valueSelector = valueSelector;
}
/**
* Initialize the buffer value given the initial offset position within the byte buffer for initialization
*/
abstract void initValue(ByteBuffer buf, int position);
/**
* Place the primitive value in the buffer given the initial offset position within the byte buffer
* at which the current aggregate value is stored
*/
abstract void putValue(ByteBuffer buf, int position);
@Override
public void init(ByteBuffer buf, int position)
{
buf.put(position, useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
initValue(buf, position);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
if ((buf.get(position) & BYTE_FLAG_FOUND_MASK) != BYTE_FLAG_FOUND_MASK) {
if (useDefault || !valueSelector.isNull()) {
putValue(buf, position);
buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NOT_NULL_BYTE));
} else {
buf.put(position, (byte) (BYTE_FLAG_FOUND_MASK | NullHandling.IS_NULL_BYTE));
}
}
}
boolean isValueNull(ByteBuffer buf, int position)
{
return (buf.get(position) & BYTE_FLAG_NULL_MASK) == NullHandling.IS_NULL_BYTE;
}
@Override
public void close()
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("valueSelector", valueSelector);
}
}

View File

@ -28,7 +28,7 @@ public class StringAnyAggregator implements Aggregator
{
private final BaseObjectColumnValueSelector valueSelector;
private final int maxStringBytes;
private boolean isFound;
private String foundValue;
public StringAnyAggregator(BaseObjectColumnValueSelector valueSelector, int maxStringBytes)
@ -36,17 +36,19 @@ public class StringAnyAggregator implements Aggregator
this.valueSelector = valueSelector;
this.maxStringBytes = maxStringBytes;
this.foundValue = null;
this.isFound = false;
}
@Override
public void aggregate()
{
if (foundValue == null) {
if (!isFound) {
final Object object = valueSelector.getObject();
foundValue = DimensionHandlerUtils.convertObjectToString(object);
if (foundValue != null && foundValue.length() > maxStringBytes) {
foundValue = foundValue.substring(0, maxStringBytes);
}
isFound = true;
}
}

View File

@ -85,11 +85,7 @@ public class StringAnyAggregatorFactory extends AggregatorFactory
@Override
public Object combine(Object lhs, Object rhs)
{
if (lhs != null) {
return lhs;
} else {
return rhs;
}
return lhs;
}
@Override

View File

@ -28,7 +28,10 @@ import java.nio.ByteBuffer;
public class StringAnyBufferAggregator implements BufferAggregator
{
private static final int NULL_STRING_LENGTH = -1;
private static final int FOUND_AND_NULL_FLAG_VALUE = -1;
private static final int NOT_FOUND_FLAG_VALUE = -2;
private static final int FOUND_VALUE_OFFSET = Integer.BYTES;
private final BaseObjectColumnValueSelector valueSelector;
private final int maxStringBytes;
@ -41,22 +44,23 @@ public class StringAnyBufferAggregator implements BufferAggregator
@Override
public void init(ByteBuffer buf, int position)
{
buf.putInt(position, NULL_STRING_LENGTH);
buf.putInt(position, NOT_FOUND_FLAG_VALUE);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
int stringSizeBytes = buf.getInt(position);
if (stringSizeBytes < 0) {
if (buf.getInt(position) == NOT_FOUND_FLAG_VALUE) {
final Object object = valueSelector.getObject();
String foundValue = DimensionHandlerUtils.convertObjectToString(object);
if (foundValue != null) {
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position + Integer.BYTES);
mutationBuffer.limit(position + Integer.BYTES + maxStringBytes);
mutationBuffer.position(position + FOUND_VALUE_OFFSET);
mutationBuffer.limit(position + FOUND_VALUE_OFFSET + maxStringBytes);
final int len = StringUtils.toUtf8WithLimit(foundValue, mutationBuffer);
mutationBuffer.putInt(position, len);
} else {
buf.putInt(position, FOUND_AND_NULL_FLAG_VALUE);
}
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.TestDoubleColumnSelectorImpl;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Comparator;
public class DoubleAnyAggregationTest extends InitializedNullHandlingTest
{
private DoubleAnyAggregatorFactory doubleAnyAggFactory;
private DoubleAnyAggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestDoubleColumnSelectorImpl valueSelector;
private TestObjectColumnSelector objectSelector;
private double[] doubles = {1.1897d, 0.001d, 86.23d, 166.228d};
private Double[] objects = {2.1897d, 1.001d, 87.23d, 167.228d};
@Before
public void setup()
{
doubleAnyAggFactory = new DoubleAnyAggregatorFactory("billy", "nilly");
combiningAggFactory = (DoubleAnyAggregatorFactory) doubleAnyAggFactory.getCombiningFactory();
valueSelector = new TestDoubleColumnSelectorImpl(doubles);
objectSelector = new TestObjectColumnSelector<>(objects);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
EasyMock.replay(colSelectorFactory);
}
@Test
public void testDoubleAnyAggregator()
{
Aggregator agg = doubleAnyAggFactory.factorize(colSelectorFactory);
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Double result = (Double) agg.get();
Assert.assertEquals((Double) doubles[0], result);
Assert.assertEquals((long) doubles[0], agg.getLong());
Assert.assertEquals(doubles[0], agg.getDouble(), 0.0001);
}
@Test
public void testDoubleAnyBufferAggregator()
{
BufferAggregator agg = doubleAnyAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleAnyAggFactory.getMaxIntermediateSizeWithNulls()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Double result = (Double) agg.get(buffer, 0);
Assert.assertEquals(doubles[0], result, 0.0001);
Assert.assertEquals((long) doubles[0], agg.getLong(buffer, 0));
Assert.assertEquals(doubles[0], agg.getDouble(buffer, 0), 0.0001);
}
@Test
public void testCombine()
{
Double d1 = 3.0;
Double d2 = 4.0;
Assert.assertEquals(d1, doubleAnyAggFactory.combine(d1, d2));
}
@Test
public void testComparatorWithNulls()
{
Double d1 = 3.0;
Double d2 = null;
Comparator comparator = doubleAnyAggFactory.getComparator();
Assert.assertEquals(1, comparator.compare(d1, d2));
Assert.assertEquals(0, comparator.compare(d1, d1));
Assert.assertEquals(0, comparator.compare(d2, d2));
Assert.assertEquals(-1, comparator.compare(d2, d1));
}
@Test
public void testDoubleAnyCombiningAggregator()
{
Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Double result = (Double) agg.get();
Assert.assertEquals(objects[0], result, 0.0001);
Assert.assertEquals(objects[0].longValue(), agg.getLong());
Assert.assertEquals(objects[0], agg.getDouble(), 0.0001);
}
@Test
public void testDoubleAnyCombiningBufferAggregator()
{
BufferAggregator agg = combiningAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleAnyAggFactory.getMaxIntermediateSizeWithNulls()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Double result = (Double) agg.get(buffer, 0);
Assert.assertEquals(objects[0], result, 0.0001);
Assert.assertEquals(objects[0].longValue(), agg.getLong(buffer, 0));
Assert.assertEquals(objects[0], agg.getDouble(buffer, 0), 0.0001);
}
@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
String doubleSpecJson = "{\"type\":\"doubleAny\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
Assert.assertEquals(doubleAnyAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class));
}
private void aggregate(
Aggregator agg
)
{
agg.aggregate();
valueSelector.increment();
objectSelector.increment();
}
private void aggregate(
BufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
valueSelector.increment();
objectSelector.increment();
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.TestFloatColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Comparator;
public class FloatAnyAggregationTest extends InitializedNullHandlingTest
{
private FloatAnyAggregatorFactory floatAnyAggFactory;
private FloatAnyAggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestFloatColumnSelector valueSelector;
private TestObjectColumnSelector objectSelector;
private float[] floats = {1.1897f, 0.001f, 86.23f, 166.228f};
private Float[] objects = {2.1897f, 1.001f, 87.23f, 167.228f};
@Before
public void setup()
{
floatAnyAggFactory = new FloatAnyAggregatorFactory("billy", "nilly");
combiningAggFactory = (FloatAnyAggregatorFactory) floatAnyAggFactory.getCombiningFactory();
valueSelector = new TestFloatColumnSelector(floats);
objectSelector = new TestObjectColumnSelector<>(objects);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
EasyMock.replay(colSelectorFactory);
}
@Test
public void testFloatAnyAggregator()
{
Aggregator agg = floatAnyAggFactory.factorize(colSelectorFactory);
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Float result = (Float) agg.get();
Assert.assertEquals((Float) floats[0], result);
Assert.assertEquals((long) floats[0], agg.getLong());
Assert.assertEquals(floats[0], agg.getFloat(), 0.0001);
}
@Test
public void testFloatAnyBufferAggregator()
{
BufferAggregator agg = floatAnyAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[floatAnyAggFactory.getMaxIntermediateSizeWithNulls()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Float result = (Float) agg.get(buffer, 0);
Assert.assertEquals(floats[0], result, 0.0001);
Assert.assertEquals((long) floats[0], agg.getLong(buffer, 0));
Assert.assertEquals(floats[0], agg.getFloat(buffer, 0), 0.0001);
}
@Test
public void testCombine()
{
Float f1 = 3.0f;
Float f2 = 4.0f;
Assert.assertEquals(f1, floatAnyAggFactory.combine(f1, f2));
}
@Test
public void testComparatorWithNulls()
{
Float f1 = 3.0f;
Float f2 = null;
Comparator comparator = floatAnyAggFactory.getComparator();
Assert.assertEquals(1, comparator.compare(f1, f2));
Assert.assertEquals(0, comparator.compare(f1, f1));
Assert.assertEquals(0, comparator.compare(f2, f2));
Assert.assertEquals(-1, comparator.compare(f2, f1));
}
@Test
public void testFloatAnyCombiningAggregator()
{
Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Float result = (Float) agg.get();
Assert.assertEquals(objects[0], result, 0.0001);
Assert.assertEquals(objects[0].longValue(), agg.getLong());
Assert.assertEquals(objects[0], agg.getFloat(), 0.0001);
}
@Test
public void testFloatAnyCombiningBufferAggregator()
{
BufferAggregator agg = combiningAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[floatAnyAggFactory.getMaxIntermediateSizeWithNulls()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Float result = (Float) agg.get(buffer, 0);
Assert.assertEquals(objects[0], result, 0.0001);
Assert.assertEquals(objects[0].longValue(), agg.getLong(buffer, 0));
Assert.assertEquals(objects[0], agg.getFloat(buffer, 0), 0.0001);
}
@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
String floatSpecJson = "{\"type\":\"floatAny\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
Assert.assertEquals(floatAnyAggFactory, mapper.readValue(floatSpecJson, AggregatorFactory.class));
}
private void aggregate(
Aggregator agg
)
{
agg.aggregate();
valueSelector.increment();
objectSelector.increment();
}
private void aggregate(
BufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
valueSelector.increment();
objectSelector.increment();
}
}

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Comparator;
public class LongAnyAggregationTest extends InitializedNullHandlingTest
{
private LongAnyAggregatorFactory longAnyAggFactory;
private LongAnyAggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestLongColumnSelector valueSelector;
private TestObjectColumnSelector objectSelector;
private long[] longs = {185, -216, -128751132, Long.MIN_VALUE};
private Long[] objects = {1123126751L, 1784247991L, 1854329816L, 1000000000L};
@Before
public void setup()
{
longAnyAggFactory = new LongAnyAggregatorFactory("billy", "nilly");
combiningAggFactory = (LongAnyAggregatorFactory) longAnyAggFactory.getCombiningFactory();
valueSelector = new TestLongColumnSelector(longs);
objectSelector = new TestObjectColumnSelector<>(objects);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
EasyMock.replay(colSelectorFactory);
}
@Test
public void testLongAnyAggregator()
{
Aggregator agg = longAnyAggFactory.factorize(colSelectorFactory);
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Long result = (Long) agg.get();
Assert.assertEquals((Long) longs[0], result);
Assert.assertEquals((long) longs[0], agg.getLong());
Assert.assertEquals(longs[0], agg.getLong(), 0.0001);
}
@Test
public void testLongAnyBufferAggregator()
{
BufferAggregator agg = longAnyAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[longAnyAggFactory.getMaxIntermediateSizeWithNulls()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Long result = (Long) agg.get(buffer, 0);
Assert.assertEquals(longs[0], result, 0.0001);
Assert.assertEquals((long) longs[0], agg.getLong(buffer, 0));
Assert.assertEquals(longs[0], agg.getLong(buffer, 0), 0.0001);
}
@Test
public void testCombine()
{
Long l1 = 3L;
Long l2 = 4L;
Assert.assertEquals(l1, longAnyAggFactory.combine(l1, l2));
}
@Test
public void testComparatorWithNulls()
{
Long l1 = 3L;
Long l2 = null;
Comparator comparator = longAnyAggFactory.getComparator();
Assert.assertEquals(1, comparator.compare(l1, l2));
Assert.assertEquals(0, comparator.compare(l1, l1));
Assert.assertEquals(0, comparator.compare(l2, l2));
Assert.assertEquals(-1, comparator.compare(l2, l1));
}
@Test
public void testLongAnyCombiningAggregator()
{
Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Long result = (Long) agg.get();
Assert.assertEquals(objects[0], result, 0.0001);
Assert.assertEquals(objects[0].longValue(), agg.getLong());
Assert.assertEquals(objects[0], agg.getLong(), 0.0001);
}
@Test
public void testLongAnyCombiningBufferAggregator()
{
BufferAggregator agg = combiningAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[longAnyAggFactory.getMaxIntermediateSizeWithNulls()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Long result = (Long) agg.get(buffer, 0);
Assert.assertEquals(objects[0], result, 0.0001);
Assert.assertEquals(objects[0].longValue(), agg.getLong(buffer, 0));
Assert.assertEquals(objects[0], agg.getLong(buffer, 0), 0.0001);
}
@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
String longSpecJson = "{\"type\":\"longAny\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
Assert.assertEquals(longAnyAggFactory, mapper.readValue(longSpecJson, AggregatorFactory.class));
}
private void aggregate(
Aggregator agg
)
{
agg.aggregate();
valueSelector.increment();
objectSelector.increment();
}
private void aggregate(
BufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
valueSelector.increment();
objectSelector.increment();
}
}

View File

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ValueType;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
public class StringAnyAggregationTest
{
private final Integer MAX_STRING_SIZE = 1024;
private AggregatorFactory stringAnyAggFactory;
private AggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestObjectColumnSelector<String> valueSelector;
private TestObjectColumnSelector objectSelector;
private String[] strings = {"1111", "2222", "3333", null, "4444"};
private String[] stringsWithNullFirst = {null, "1111", "2222", "3333", null, "4444"};
@Before
public void setup()
{
stringAnyAggFactory = new StringAnyAggregatorFactory("billy", "nilly", MAX_STRING_SIZE);
combiningAggFactory = stringAnyAggFactory.getCombiningFactory();
valueSelector = new TestObjectColumnSelector<>(strings);
objectSelector = new TestObjectColumnSelector<>(strings);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly"))
.andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING));
EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null);
EasyMock.replay(colSelectorFactory);
}
@Test
public void testStringAnyAggregator()
{
Aggregator agg = stringAnyAggFactory.factorize(colSelectorFactory);
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
String result = (String) agg.get();
Assert.assertEquals(strings[0], result);
}
@Test
public void testStringAnyAggregatorWithNullFirst()
{
valueSelector = new TestObjectColumnSelector<>(stringsWithNullFirst);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly"))
.andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING));
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
EasyMock.replay(colSelectorFactory);
Aggregator agg = stringAnyAggFactory.factorize(colSelectorFactory);
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
String result = (String) agg.get();
Assert.assertNull(result);
}
@Test
public void testStringAnyBufferAggregator()
{
BufferAggregator agg = stringAnyAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[stringAnyAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
String result = (String) agg.get(buffer, 0);
Assert.assertEquals(strings[0], result);
}
@Test
public void testStringAnyBufferAggregatorWithNullFirst()
{
valueSelector = new TestObjectColumnSelector<>(stringsWithNullFirst);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly"))
.andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING));
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
EasyMock.replay(colSelectorFactory);
BufferAggregator agg = stringAnyAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[stringAnyAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
String result = (String) agg.get(buffer, 0);
Assert.assertNull(result);
}
@Test
public void testCombine()
{
String s1 = "aaaaaa";
String s2 = "aaaaaa";
Assert.assertEquals(s1, stringAnyAggFactory.combine(s1, s2));
}
@Test
public void testStringAnyCombiningAggregator()
{
Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
String result = (String) agg.get();
Assert.assertEquals(strings[0], result);
Assert.assertEquals(strings[0], result);
}
@Test
public void testStringAnyCombiningBufferAggregator()
{
BufferAggregator agg = combiningAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[stringAnyAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
String result = (String) agg.get(buffer, 0);
Assert.assertEquals(strings[0], result);
Assert.assertEquals(strings[0], result);
}
private void aggregate(
Aggregator agg
)
{
agg.aggregate();
valueSelector.increment();
objectSelector.increment();
}
private void aggregate(
BufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
valueSelector.increment();
objectSelector.increment();
}
}

View File

@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
public class StringAnyBufferAggregatorTest
{
private void aggregateBuffer(
TestObjectColumnSelector valueSelector,
BufferAggregator agg,
ByteBuffer buf,
int position
)
{
agg.aggregate(buf, position);
valueSelector.increment();
}
@Test
public void testBufferAggregate()
{
final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
Integer maxStringBytes = 1024;
TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
StringAnyAggregatorFactory factory = new StringAnyAggregatorFactory(
"billy", "billy", maxStringBytes
);
StringAnyBufferAggregator agg = new StringAnyBufferAggregator(
objectColumnSelector,
maxStringBytes
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
int position = 0;
agg.init(buf, position);
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < strings.length; i++) {
aggregateBuffer(objectColumnSelector, agg, buf, position);
}
String result = ((String) agg.get(buf, position));
Assert.assertEquals(strings[0], result);
}
@Test
public void testBufferAggregateWithFoldCheck()
{
final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
Integer maxStringBytes = 1024;
TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
StringAnyAggregatorFactory factory = new StringAnyAggregatorFactory(
"billy", "billy", maxStringBytes
);
StringAnyBufferAggregator agg = new StringAnyBufferAggregator(
objectColumnSelector,
maxStringBytes
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
int position = 0;
agg.init(buf, position);
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < strings.length; i++) {
aggregateBuffer(objectColumnSelector, agg, buf, position);
}
String result = ((String) agg.get(buf, position));
Assert.assertEquals(strings[0], result);
}
@Test
public void testContainsNullBufferAggregate()
{
final String[] strings = {"CCCC", "AAAA", "BBBB", null, "EEEE"};
Integer maxStringBytes = 1024;
TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
StringAnyAggregatorFactory factory = new StringAnyAggregatorFactory(
"billy", "billy", maxStringBytes
);
StringAnyBufferAggregator agg = new StringAnyBufferAggregator(
objectColumnSelector,
maxStringBytes
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
int position = 0;
agg.init(buf, position);
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < strings.length; i++) {
aggregateBuffer(objectColumnSelector, agg, buf, position);
}
String result = ((String) agg.get(buf, position));
Assert.assertEquals(strings[0], result);
}
@Test
public void testNullFirstBufferAggregate()
{
final String[] strings = {null, "CCCC", "AAAA", "BBBB", "EEEE"};
Integer maxStringBytes = 1024;
TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
StringAnyAggregatorFactory factory = new StringAnyAggregatorFactory(
"billy", "billy", maxStringBytes
);
StringAnyBufferAggregator agg = new StringAnyBufferAggregator(
objectColumnSelector,
maxStringBytes
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
int position = 0;
agg.init(buf, position);
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < strings.length; i++) {
aggregateBuffer(objectColumnSelector, agg, buf, position);
}
String result = ((String) agg.get(buf, position));
Assert.assertNull(result);
}
@Test
public void testNonStringValue()
{
final Double[] doubles = {1.00, 2.00};
Integer maxStringBytes = 1024;
TestObjectColumnSelector<Double> objectColumnSelector = new TestObjectColumnSelector<>(doubles);
StringAnyAggregatorFactory factory = new StringAnyAggregatorFactory(
"billy", "billy", maxStringBytes
);
StringAnyBufferAggregator agg = new StringAnyBufferAggregator(
objectColumnSelector,
maxStringBytes
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
int position = 0;
agg.init(buf, position);
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < doubles.length; i++) {
aggregateBuffer(objectColumnSelector, agg, buf, position);
}
String result = ((String) agg.get(buf, position));
Assert.assertEquals("1.0", result);
}
}

View File

@ -318,14 +318,14 @@ public class QueryLifecycle
if (e != null) {
statsMap.put("exception", e.toString());
log.noStackTrace().warn(e, "Exception while processing queryId [%s]", baseQuery.getId());
if (e instanceof QueryInterruptedException) {
// Mimic behavior from QueryResource, where this code was originally taken from.
log.noStackTrace().warn(e, "Exception while processing queryId [%s]", baseQuery.getId());
statsMap.put("interrupted", true);
statsMap.put("reason", e.toString());
}
}
requestLogger.logNativeQuery(
RequestLogLine.forNative(
baseQuery,

View File

@ -1339,7 +1339,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{1L, 1.0f, 1.0, "", 2L, 2.0f, "1"}) : ImmutableList.of(new Object[]{1L, 1.0f, 1.0, "10.1", 2L, 2.0f, "1"})
NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{1L, 1.0f, 1.0, "", 2L, 2.0f, "1"}) : ImmutableList.of(new Object[]{1L, 1.0f, 1.0, "", 2L, 2.0f, "1"})
);
}
@ -1677,7 +1677,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.build()
),
ImmutableList.of(
new Object[]{NullHandling.sqlCompatible() ? 12.1 : 11.1}
new Object[]{NullHandling.sqlCompatible() ? 12.1 : 10.1}
)
);
}
@ -1785,6 +1785,82 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testAnyAggregatorsDoesNotSkipNulls() throws Exception
{
// Cannot vectorize ANY aggregator.
skipVectorize();
testQuery(
"SELECT ANY_VALUE(dim1, 32), ANY_VALUE(l2), ANY_VALUE(d2), ANY_VALUE(f2) FROM druid.numfoo",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(
aggregators(
new StringAnyAggregatorFactory("a0", "dim1", 32),
new LongAnyAggregatorFactory("a1", "l2"),
new DoubleAnyAggregatorFactory("a2", "d2"),
new FloatAnyAggregatorFactory("a3", "f2")
)
)
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
// first row has null for l2, d2, f2 and dim1 as empty string (which is null in default mode)
ImmutableList.of(
useDefault ? new Object[]{"", 0L, 0.0, 0f} : new Object[]{"", null, null, null}
)
);
}
@Test
public void testAnyAggregatorsSkipNullsWithFilter() throws Exception
{
// Cannot vectorize ANY aggregator.
skipVectorize();
final DimFilter filter;
if (useDefault) {
filter = not(selector("dim1", null, null));
} else {
filter = and(
not(selector("dim1", null, null)),
not(selector("l2", null, null)),
not(selector("d2", null, null)),
not(selector("f2", null, null))
);
}
testQuery(
"SELECT ANY_VALUE(dim1, 32), ANY_VALUE(l2), ANY_VALUE(d2), ANY_VALUE(f2) "
+ "FROM druid.numfoo "
+ "WHERE dim1 IS NOT NULL AND l2 IS NOT NULL AND d2 IS NOT NULL AND f2 is NOT NULL",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.filters(filter)
.aggregators(
aggregators(
new StringAnyAggregatorFactory("a0", "dim1", 32),
new LongAnyAggregatorFactory("a1", "l2"),
new DoubleAnyAggregatorFactory("a2", "d2"),
new FloatAnyAggregatorFactory("a3", "f2")
)
)
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
// first row of dim1 is empty string, which is null in default mode
new Object[]{"10.1", 325323L, 1.7, 0.1f}
)
);
}
@Test
public void testOrderByEarliestFloat() throws Exception
{
@ -2068,6 +2144,154 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testOrderByAnyFloat() throws Exception
{
// Cannot vectorize ANY aggregator.
skipVectorize();
List<Object[]> expected;
if (NullHandling.replaceWithDefault()) {
expected = ImmutableList.of(
new Object[]{"1", 0.0f},
new Object[]{"2", 0.0f},
new Object[]{"abc", 0.0f},
new Object[]{"def", 0.0f},
new Object[]{"10.1", 0.1f},
new Object[]{"", 1.0f}
);
} else {
expected = ImmutableList.of(
new Object[]{"2", 0.0f},
new Object[]{"10.1", 0.1f},
new Object[]{"", 1.0f},
// Nulls are last because of the null first wrapped Comparator in InvertedTopNMetricSpec which is then
// reversed by TopNNumericResultBuilder.build()
new Object[]{"1", null},
new Object[]{"abc", null},
new Object[]{"def", null}
);
}
testQuery(
"SELECT dim1, ANY_VALUE(f1) FROM druid.numfoo GROUP BY 1 ORDER BY 2 LIMIT 10",
ImmutableList.of(
new TopNQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.dimension(new DefaultDimensionSpec("dim1", "_d0"))
.aggregators(
aggregators(
new FloatAnyAggregatorFactory("a0", "f1")
)
)
.metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
.threshold(10)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expected
);
}
@Test
public void testOrderByAnyDouble() throws Exception
{
// Cannot vectorize ANY aggregator.
skipVectorize();
List<Object[]> expected;
if (NullHandling.replaceWithDefault()) {
expected = ImmutableList.of(
new Object[]{"1", 0.0},
new Object[]{"2", 0.0},
new Object[]{"abc", 0.0},
new Object[]{"def", 0.0},
new Object[]{"", 1.0},
new Object[]{"10.1", 1.7}
);
} else {
expected = ImmutableList.of(
new Object[]{"2", 0.0},
new Object[]{"", 1.0},
new Object[]{"10.1", 1.7},
// Nulls are last because of the null first wrapped Comparator in InvertedTopNMetricSpec which is then
// reversed by TopNNumericResultBuilder.build()
new Object[]{"1", null},
new Object[]{"abc", null},
new Object[]{"def", null}
);
}
testQuery(
"SELECT dim1, ANY_VALUE(d1) FROM druid.numfoo GROUP BY 1 ORDER BY 2 LIMIT 10",
ImmutableList.of(
new TopNQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.dimension(new DefaultDimensionSpec("dim1", "_d0"))
.aggregators(
aggregators(
new DoubleAnyAggregatorFactory("a0", "d1")
)
)
.metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
.threshold(10)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expected
);
}
@Test
public void testOrderByAnyLong() throws Exception
{
// Cannot vectorize ANY aggregator.
skipVectorize();
List<Object[]> expected;
if (NullHandling.replaceWithDefault()) {
expected = ImmutableList.of(
new Object[]{"1", 0L},
new Object[]{"2", 0L},
new Object[]{"abc", 0L},
new Object[]{"def", 0L},
new Object[]{"", 7L},
new Object[]{"10.1", 325323L}
);
} else {
expected = ImmutableList.of(
new Object[]{"2", 0L},
new Object[]{"", 7L},
new Object[]{"10.1", 325323L},
// Nulls are last because of the null first wrapped Comparator in InvertedTopNMetricSpec which is then
// reversed by TopNNumericResultBuilder.build()
new Object[]{"1", null},
new Object[]{"abc", null},
new Object[]{"def", null}
);
}
testQuery(
"SELECT dim1, ANY_VALUE(l1) FROM druid.numfoo GROUP BY 1 ORDER BY 2 LIMIT 10",
ImmutableList.of(
new TopNQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.dimension(new DefaultDimensionSpec("dim1", "_d0"))
.aggregators(
aggregators(
new LongAnyAggregatorFactory("a0", "l1")
)
)
.metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
.threshold(10)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expected
);
}
@Test
public void testGroupByLong() throws Exception
{

View File

@ -269,8 +269,11 @@ public class CalciteTests
ImmutableList.<DimensionSchema>builder()
.addAll(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3")))
.add(new DoubleDimensionSchema("d1"))
.add(new DoubleDimensionSchema("d2"))
.add(new FloatDimensionSchema("f1"))
.add(new FloatDimensionSchema("f2"))
.add(new LongDimensionSchema("l1"))
.add(new LongDimensionSchema("l2"))
.build(),
null,
null
@ -414,8 +417,11 @@ public class CalciteTests
.put("m1", "2.0")
.put("m2", "2.0")
.put("d1", 1.7)
.put("d2", 1.7)
.put("f1", 0.1f)
.put("f2", 0.1f)
.put("l1", 325323L)
.put("l2", 325323L)
.put("dim1", "10.1")
.put("dim2", ImmutableList.of())
.put("dim3", ImmutableList.of("b", "c"))
@ -428,8 +434,11 @@ public class CalciteTests
.put("m1", "3.0")
.put("m2", "3.0")
.put("d1", 0.0)
.put("d2", 0.0)
.put("f1", 0.0)
.put("f2", 0.0)
.put("l1", 0)
.put("l2", 0)
.put("dim1", "2")
.put("dim2", ImmutableList.of(""))
.put("dim3", ImmutableList.of("d"))