Implement ANY aggregator (#9187)

* Implement ANY aggregator

* Add copyright headers

* Add unit tests

* fix BufferAggregator

* Fix bug in BufferAggregator

* hook up the SQL command

* add check for buffer aggregator

* Address comment

* address comments

* add docs

* Address comments

* add more tests for numeric columns that have null values when run in sql compatible null mode

* fix checkstyle errors

* fix failing tests

* fix failing tests
This commit is contained in:
Maytas Monsereenusorn 2020-01-16 14:40:32 -08:00 committed by Jonathan Wei
parent a87db7f353
commit 42359c93dd
24 changed files with 1649 additions and 37 deletions

View File

@ -546,4 +546,26 @@ public class StringUtils
return new String(data); return new String(data);
} }
/**
* Returns the string truncated to maxBytes.
* If given string input is shorter than maxBytes, then it remains the same.
*
* @param s The input string to possibly be truncated
* @param maxBytes The max bytes that string input will be truncated to
*
* @return the string after truncated to maxBytes
*/
@Nullable
public static String chop(@Nullable final String s, final int maxBytes)
{
if (s == null) {
return null;
} else {
// Shorten firstValue to what could fit in maxBytes as UTF-8.
final byte[] bytes = new byte[maxBytes];
final int len = StringUtils.toUtf8WithLimit(s, ByteBuffer.wrap(bytes));
return new String(bytes, 0, len, StandardCharsets.UTF_8);
}
}
} }

View File

@ -236,6 +236,61 @@ Note that queries with first/last aggregators on a segment created with rollup e
} }
``` ```
### ANY aggregator
(Double/Float/Long/String) ANY aggregator cannot be used in ingestion spec, and should only be specified as part of queries.
If `druid.generic.useDefaultValueForNull=true` aggregation can returns the default value for null and does not prefer "non-null" values over the default value for null. If `druid.generic.useDefaultValueForNull=false`, then aggregation will returns any non-null value.
#### `doubleAny` aggregator
`doubleAny` returns any double metric value
```json
{
"type" : "doubleAny",
"name" : <output_name>,
"fieldName" : <metric_name>
}
```
#### `floatAny` aggregator
`floatAny` returns any float metric value
```json
{
"type" : "floatAny",
"name" : <output_name>,
"fieldName" : <metric_name>
}
```
#### `longAny` aggregator
`longAny` returns any long metric value
```json
{
"type" : "longAny",
"name" : <output_name>,
"fieldName" : <metric_name>,
}
```
#### `stringAny` aggregator
`stringAny` returns any string metric value
```json
{
"type" : "stringAny",
"name" : <output_name>,
"fieldName" : <metric_name>,
"maxStringBytes" : <integer> # (optional, defaults to 1024),
}
```
### JavaScript aggregator ### JavaScript aggregator
Computes an arbitrary JavaScript function over a set of columns (both metrics and dimensions are allowed). Your Computes an arbitrary JavaScript function over a set of columns (both metrics and dimensions are allowed). Your

View File

@ -203,6 +203,10 @@ Only the COUNT aggregation can accept DISTINCT.
|`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.| |`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
|`LATEST(expr)`|Returns the latest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.| |`LATEST(expr)`|Returns the latest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|
|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.| |`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
|`ANY_VALUE(expr)`|Returns any value of `expr`, which must be numeric. If `druid.generic.useDefaultValueForNull=true` this can return the default value for null and does not prefer "non-null" values over the default value for null. If `druid.generic.useDefaultValueForNull=false`, then this will return any non-null value of `expr`|
|`ANY_VALUE(expr, maxBytesPerString)`|Like `ANY_VALUE(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
For advice on choosing approximate aggregation functions, check out our [approximate aggregations documentation](aggregations.html#approx). For advice on choosing approximate aggregation functions, check out our [approximate aggregations documentation](aggregations.html#approx).

View File

@ -38,6 +38,10 @@ import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.SerializablePairLongStringSerde; import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory;
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
@ -110,7 +114,11 @@ public class AggregatorsModule extends SimpleModule
@JsonSubTypes.Type(name = "doubleMean", value = DoubleMeanAggregatorFactory.class), @JsonSubTypes.Type(name = "doubleMean", value = DoubleMeanAggregatorFactory.class),
@JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class), @JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "stringLast", value = StringLastAggregatorFactory.class), @JsonSubTypes.Type(name = "stringLast", value = StringLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "stringLastFold", value = StringLastFoldingAggregatorFactory.class) @JsonSubTypes.Type(name = "stringLastFold", value = StringLastFoldingAggregatorFactory.class),
@JsonSubTypes.Type(name = "longAny", value = LongAnyAggregatorFactory.class),
@JsonSubTypes.Type(name = "floatAny", value = FloatAnyAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleAny", value = DoubleAnyAggregatorFactory.class),
@JsonSubTypes.Type(name = "stringAny", value = StringAnyAggregatorFactory.class)
}) })
public interface AggregatorFactoryMixin public interface AggregatorFactoryMixin
{ {

View File

@ -121,6 +121,12 @@ public class AggregatorUtil
public static final byte MEAN_CACHE_TYPE_ID = 0x41; public static final byte MEAN_CACHE_TYPE_ID = 0x41;
// ANY aggregator
public static final byte LONG_ANY_CACHE_TYPE_ID = 0x42;
public static final byte DOUBLE_ANY_CACHE_TYPE_ID = 0x43;
public static final byte FLOAT_ANY_CACHE_TYPE_ID = 0x44;
public static final byte STRING_ANY_CACHE_TYPE_ID = 0x45;
/** /**
* returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
* *

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
/**
* This Aggregator is created by the {@link DoubleAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class DoubleAnyAggregator implements Aggregator
{
private final BaseDoubleColumnValueSelector valueSelector;
private double foundValue;
private boolean isFound;
public DoubleAnyAggregator(BaseDoubleColumnValueSelector valueSelector)
{
this.valueSelector = valueSelector;
this.foundValue = 0;
this.isFound = false;
}
@Override
public void aggregate()
{
if (!isFound) {
foundValue = valueSelector.getDouble();
isFound = true;
}
}
@Override
public Object get()
{
return foundValue;
}
@Override
public float getFloat()
{
return (float) foundValue;
}
@Override
public long getLong()
{
return (long) foundValue;
}
@Override
public double getDouble()
{
return foundValue;
}
@Override
public void close()
{
// no-op
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SimpleDoubleAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
public class DoubleAnyAggregatorFactory extends SimpleDoubleAggregatorFactory
{
@JsonCreator
public DoubleAnyAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("expression") @Nullable String expression,
@JacksonInject ExprMacroTable macroTable
)
{
super(macroTable, name, fieldName, expression);
}
public DoubleAnyAggregatorFactory(String name, String fieldName)
{
this(name, fieldName, null, ExprMacroTable.nil());
}
@Override
protected double nullValue()
{
return Double.NaN;
}
@Override
protected Aggregator buildAggregator(BaseDoubleColumnValueSelector selector)
{
return new DoubleAnyAggregator(selector);
}
@Override
protected BufferAggregator buildBufferAggregator(BaseDoubleColumnValueSelector selector)
{
return new DoubleAnyBufferAggregator(selector);
}
@Override
@Nullable
public Object combine(@Nullable Object lhs, @Nullable Object rhs)
{
if (lhs != null) {
return lhs;
} else {
return rhs;
}
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new DoubleAnyAggregatorFactory(name, name, null, macroTable);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(new DoubleAnyAggregatorFactory(fieldName, fieldName, expression, macroTable));
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(AggregatorUtil.DOUBLE_ANY_CACHE_TYPE_ID)
.appendString(fieldName)
.appendString(expression)
.build();
}
@Override
public int getMaxIntermediateSize()
{
return Double.BYTES + Byte.BYTES;
}
@Override
public String toString()
{
return "DoubleAnyAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", expression='" + expression + '\'' +
", name='" + name + '\'' +
'}';
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import java.nio.ByteBuffer;
/**
* This Aggregator is created by the {@link DoubleAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class DoubleAnyBufferAggregator implements BufferAggregator
{
private static final byte BYTE_FLAG_IS_NOT_SET = 0;
private static final byte BYTE_FLAG_IS_SET = 1;
private final BaseDoubleColumnValueSelector valueSelector;
public DoubleAnyBufferAggregator(BaseDoubleColumnValueSelector valueSelector)
{
this.valueSelector = valueSelector;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.put(position, BYTE_FLAG_IS_NOT_SET);
buf.putDouble(position + Byte.BYTES, NullHandling.ZERO_DOUBLE);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
if (buf.get(position) == BYTE_FLAG_IS_NOT_SET) {
buf.put(position, BYTE_FLAG_IS_SET);
buf.putDouble(position + Byte.BYTES, valueSelector.getDouble());
}
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getDouble(position + Byte.BYTES);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getDouble(position + Byte.BYTES);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position + Byte.BYTES);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return buf.getDouble(position + Byte.BYTES);
}
@Override
public void close()
{
// no-op
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("valueSelector", valueSelector);
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
/**
* This Aggregator is created by the {@link FloatAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class FloatAnyAggregator implements Aggregator
{
private final BaseFloatColumnValueSelector valueSelector;
private float foundValue;
private boolean isFound;
public FloatAnyAggregator(BaseFloatColumnValueSelector valueSelector)
{
this.valueSelector = valueSelector;
this.foundValue = 0;
this.isFound = false;
}
@Override
public void aggregate()
{
if (!isFound) {
foundValue = valueSelector.getFloat();
isFound = true;
}
}
@Override
public Object get()
{
return foundValue;
}
@Override
public float getFloat()
{
return foundValue;
}
@Override
public long getLong()
{
return (long) foundValue;
}
@Override
public double getDouble()
{
return (double) foundValue;
}
@Override
public void close()
{
// no-op
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SimpleFloatAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
public class FloatAnyAggregatorFactory extends SimpleFloatAggregatorFactory
{
@JsonCreator
public FloatAnyAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("expression") @Nullable String expression,
@JacksonInject ExprMacroTable macroTable
)
{
super(macroTable, name, fieldName, expression);
}
public FloatAnyAggregatorFactory(String name, String fieldName)
{
this(name, fieldName, null, ExprMacroTable.nil());
}
@Override
protected float nullValue()
{
return Float.NaN;
}
@Override
protected Aggregator buildAggregator(BaseFloatColumnValueSelector selector)
{
return new FloatAnyAggregator(selector);
}
@Override
protected BufferAggregator buildBufferAggregator(BaseFloatColumnValueSelector selector)
{
return new FloatAnyBufferAggregator(selector);
}
@Override
@Nullable
public Object combine(@Nullable Object lhs, @Nullable Object rhs)
{
if (lhs != null) {
return lhs;
} else {
return rhs;
}
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new FloatAnyAggregatorFactory(name, name, null, macroTable);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(new FloatAnyAggregatorFactory(fieldName, fieldName, expression, macroTable));
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(AggregatorUtil.FLOAT_ANY_CACHE_TYPE_ID)
.appendString(fieldName)
.appendString(expression)
.build();
}
@Override
public int getMaxIntermediateSize()
{
return Float.BYTES + Byte.BYTES;
}
@Override
public String toString()
{
return "FloatAnyAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", expression='" + expression + '\'' +
", name='" + name + '\'' +
'}';
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import java.nio.ByteBuffer;
/**
* This Aggregator is created by the {@link FloatAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class FloatAnyBufferAggregator implements BufferAggregator
{
private static final byte BYTE_FLAG_IS_NOT_SET = 0;
private static final byte BYTE_FLAG_IS_SET = 1;
private static final float NULL_VALUE = 0;
private final BaseFloatColumnValueSelector valueSelector;
public FloatAnyBufferAggregator(BaseFloatColumnValueSelector valueSelector)
{
this.valueSelector = valueSelector;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.put(position, BYTE_FLAG_IS_NOT_SET);
buf.putFloat(position + Byte.BYTES, NULL_VALUE);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
if (buf.get(position) == BYTE_FLAG_IS_NOT_SET) {
buf.put(position, BYTE_FLAG_IS_SET);
buf.putFloat(position + Byte.BYTES, valueSelector.getFloat());
}
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getFloat(position + Byte.BYTES);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return buf.getFloat(position + Byte.BYTES);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getFloat(position + Byte.BYTES);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return (double) buf.getFloat(position + Byte.BYTES);
}
@Override
public void close()
{
// no-op
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("valueSelector", valueSelector);
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.segment.BaseLongColumnValueSelector;
/**
* This Aggregator is created by the {@link LongAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class LongAnyAggregator implements Aggregator
{
private final BaseLongColumnValueSelector valueSelector;
private long foundValue;
private boolean isFound;
public LongAnyAggregator(BaseLongColumnValueSelector valueSelector)
{
this.valueSelector = valueSelector;
this.foundValue = 0;
this.isFound = false;
}
@Override
public void aggregate()
{
if (!isFound) {
foundValue = valueSelector.getLong();
isFound = true;
}
}
@Override
public Object get()
{
return foundValue;
}
@Override
public float getFloat()
{
return (float) foundValue;
}
@Override
public long getLong()
{
return foundValue;
}
@Override
public double getDouble()
{
return (double) foundValue;
}
@Override
public void close()
{
// no-op
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SimpleLongAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
public class LongAnyAggregatorFactory extends SimpleLongAggregatorFactory
{
@JsonCreator
public LongAnyAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("expression") @Nullable String expression,
@JacksonInject ExprMacroTable macroTable
)
{
super(macroTable, name, fieldName, expression);
}
public LongAnyAggregatorFactory(String name, String fieldName)
{
this(name, fieldName, null, ExprMacroTable.nil());
}
@Override
protected long nullValue()
{
return 0;
}
@Override
protected Aggregator buildAggregator(BaseLongColumnValueSelector selector)
{
return new LongAnyAggregator(selector);
}
@Override
protected BufferAggregator buildBufferAggregator(BaseLongColumnValueSelector selector)
{
return new LongAnyBufferAggregator(selector);
}
@Override
@Nullable
public Object combine(@Nullable Object lhs, @Nullable Object rhs)
{
if (lhs != null) {
return lhs;
} else {
return rhs;
}
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new LongAnyAggregatorFactory(name, name, null, macroTable);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(new LongAnyAggregatorFactory(fieldName, fieldName, expression, macroTable));
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(AggregatorUtil.LONG_ANY_CACHE_TYPE_ID)
.appendString(fieldName)
.appendString(expression)
.build();
}
@Override
public int getMaxIntermediateSize()
{
return Long.BYTES + Byte.BYTES;
}
@Override
public String toString()
{
return "LongAnyAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", expression='" + expression + '\'' +
", name='" + name + '\'' +
'}';
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import java.nio.ByteBuffer;
/**
* This Aggregator is created by the {@link LongAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class LongAnyBufferAggregator implements BufferAggregator
{
private static final byte BYTE_FLAG_IS_NOT_SET = 0;
private static final byte BYTE_FLAG_IS_SET = 1;
private static final long NULL_VALUE = 0;
private final BaseLongColumnValueSelector valueSelector;
public LongAnyBufferAggregator(BaseLongColumnValueSelector valueSelector)
{
this.valueSelector = valueSelector;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.put(position, BYTE_FLAG_IS_NOT_SET);
buf.putLong(position + Byte.BYTES, NULL_VALUE);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
if (buf.get(position) == BYTE_FLAG_IS_NOT_SET) {
buf.put(position, BYTE_FLAG_IS_SET);
buf.putLong(position + Byte.BYTES, valueSelector.getLong());
}
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getLong(position + Byte.BYTES);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position + Byte.BYTES);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return (double) buf.getLong(position + Byte.BYTES);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position + Byte.BYTES);
}
@Override
public void close()
{
// no-op
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("valueSelector", valueSelector);
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
public class StringAnyAggregator implements Aggregator
{
private final BaseObjectColumnValueSelector valueSelector;
private final int maxStringBytes;
private String foundValue;
public StringAnyAggregator(BaseObjectColumnValueSelector valueSelector, int maxStringBytes)
{
this.valueSelector = valueSelector;
this.maxStringBytes = maxStringBytes;
this.foundValue = null;
}
@Override
public void aggregate()
{
if (foundValue == null) {
final Object object = valueSelector.getObject();
foundValue = DimensionHandlerUtils.convertObjectToString(object);
if (foundValue != null && foundValue.length() > maxStringBytes) {
foundValue = foundValue.substring(0, maxStringBytes);
}
}
}
@Override
public Object get()
{
return StringUtils.chop(foundValue, maxStringBytes);
}
@Override
public float getFloat()
{
throw new UnsupportedOperationException("StringAnyAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("StringAnyAggregator does not support getLong()");
}
@Override
public double getDouble()
{
throw new UnsupportedOperationException("StringAnyAggregator does not support getDouble()");
}
@Override
public void close()
{
// no-op
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelectorFactory;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
public class StringAnyAggregatorFactory extends AggregatorFactory
{
private static final Comparator<String> VALUE_COMPARATOR = Comparator.nullsFirst(Comparator.naturalOrder());
private final String fieldName;
private final String name;
protected final int maxStringBytes;
@JsonCreator
public StringAnyAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("maxStringBytes") Integer maxStringBytes
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
if (maxStringBytes != null && maxStringBytes < 0) {
throw new IAE("maxStringBytes must be greater than 0");
}
this.name = name;
this.fieldName = fieldName;
this.maxStringBytes = maxStringBytes == null
? StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE
: maxStringBytes;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new StringAnyAggregator(metricFactory.makeColumnValueSelector(fieldName), maxStringBytes);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new StringAnyBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), maxStringBytes);
}
@Override
public Comparator getComparator()
{
return StringAnyAggregatorFactory.VALUE_COMPARATOR;
}
@Override
public Object combine(Object lhs, Object rhs)
{
if (lhs != null) {
return lhs;
} else {
return rhs;
}
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new StringAnyAggregatorFactory(name, name, maxStringBytes);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(new StringAnyAggregatorFactory(fieldName, fieldName, maxStringBytes));
}
@Override
public Object deserialize(Object object)
{
return object;
}
@Override
public Object finalizeComputation(@Nullable Object object)
{
return object;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public Integer getMaxStringBytes()
{
return maxStringBytes;
}
@Override
public List<String> requiredFields()
{
return Collections.singletonList(fieldName);
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(AggregatorUtil.STRING_ANY_CACHE_TYPE_ID)
.appendString(fieldName)
.appendInt(maxStringBytes)
.build();
}
@Override
public String getTypeName()
{
return "string";
}
@Override
public int getMaxIntermediateSize()
{
return Integer.BYTES + maxStringBytes;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StringAnyAggregatorFactory that = (StringAnyAggregatorFactory) o;
return maxStringBytes == that.maxStringBytes &&
Objects.equals(fieldName, that.fieldName) &&
Objects.equals(name, that.name);
}
@Override
public int hashCode()
{
return Objects.hash(fieldName, name, maxStringBytes);
}
@Override
public String toString()
{
return "StringAnyAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", name='" + name + '\'' +
", maxStringBytes=" + maxStringBytes +
'}';
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.any;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
import java.nio.ByteBuffer;
public class StringAnyBufferAggregator implements BufferAggregator
{
private static final int NULL_STRING_LENGTH = -1;
private final BaseObjectColumnValueSelector valueSelector;
private final int maxStringBytes;
public StringAnyBufferAggregator(BaseObjectColumnValueSelector valueSelector, int maxStringBytes)
{
this.valueSelector = valueSelector;
this.maxStringBytes = maxStringBytes;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putInt(position, NULL_STRING_LENGTH);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
int stringSizeBytes = buf.getInt(position);
if (stringSizeBytes < 0) {
final Object object = valueSelector.getObject();
String foundValue = DimensionHandlerUtils.convertObjectToString(object);
if (foundValue != null) {
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position + Integer.BYTES);
mutationBuffer.limit(position + Integer.BYTES + maxStringBytes);
final int len = StringUtils.toUtf8WithLimit(foundValue, mutationBuffer);
mutationBuffer.putInt(position, len);
}
}
}
@Override
public Object get(ByteBuffer buf, int position)
{
ByteBuffer copyBuffer = buf.duplicate();
copyBuffer.position(position);
int stringSizeBytes = copyBuffer.getInt();
if (stringSizeBytes >= 0) {
byte[] valueBytes = new byte[stringSizeBytes];
copyBuffer.get(valueBytes, 0, stringSizeBytes);
return StringUtils.fromUtf8(valueBytes);
} else {
return null;
}
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("StringAnyBufferAggregator does not support getFloat()");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("StringAnyBufferAggregator does not support getLong()");
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("StringAnyBufferAggregator does not support getDouble()");
}
@Override
public void close()
{
// no-op
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.aggregation.first; package org.apache.druid.query.aggregation.first;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseLongColumnValueSelector;
@ -72,7 +73,7 @@ public class StringFirstAggregator implements Aggregator
@Override @Override
public Object get() public Object get()
{ {
return new SerializablePairLongString(firstTime, StringFirstLastUtils.chop(firstValue, maxStringBytes)); return new SerializablePairLongString(firstTime, StringUtils.chop(firstValue, maxStringBytes));
} }
@Override @Override

View File

@ -27,25 +27,11 @@ import org.apache.druid.segment.DimensionHandlerUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class StringFirstLastUtils public class StringFirstLastUtils
{ {
private static final int NULL_VALUE = -1; private static final int NULL_VALUE = -1;
@Nullable
public static String chop(@Nullable final String s, final int maxBytes)
{
if (s == null) {
return null;
} else {
// Shorten firstValue to what could fit in maxBytes as UTF-8.
final byte[] bytes = new byte[maxBytes];
final int len = StringUtils.toUtf8WithLimit(s, ByteBuffer.wrap(bytes));
return new String(bytes, 0, len, StandardCharsets.UTF_8);
}
}
@Nullable @Nullable
public static SerializablePairLongString readPairFromSelectors( public static SerializablePairLongString readPairFromSelectors(
final BaseLongColumnValueSelector timeSelector, final BaseLongColumnValueSelector timeSelector,

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.aggregation.last; package org.apache.druid.query.aggregation.last;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.query.aggregation.first.StringFirstLastUtils;
@ -75,7 +76,7 @@ public class StringLastAggregator implements Aggregator
@Override @Override
public Object get() public Object get()
{ {
return new SerializablePairLongString(lastTime, StringFirstLastUtils.chop(lastValue, maxStringBytes)); return new SerializablePairLongString(lastTime, StringUtils.chop(lastValue, maxStringBytes));
} }
@Override @Override

View File

@ -31,8 +31,13 @@ import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Optionality;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory;
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
@ -60,12 +65,13 @@ import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
public class EarliestLatestSqlAggregator implements SqlAggregator public class EarliestLatestAnySqlAggregator implements SqlAggregator
{ {
public static final SqlAggregator EARLIEST = new EarliestLatestSqlAggregator(EarliestOrLatest.EARLIEST); public static final SqlAggregator EARLIEST = new EarliestLatestAnySqlAggregator(AggregatorType.EARLIEST);
public static final SqlAggregator LATEST = new EarliestLatestSqlAggregator(EarliestOrLatest.LATEST); public static final SqlAggregator LATEST = new EarliestLatestAnySqlAggregator(AggregatorType.LATEST);
public static final SqlAggregator ANY_VALUE = new EarliestLatestAnySqlAggregator(AggregatorType.ANY_VALUE);
enum EarliestOrLatest enum AggregatorType
{ {
EARLIEST { EARLIEST {
@Override @Override
@ -81,7 +87,7 @@ public class EarliestLatestSqlAggregator implements SqlAggregator
case STRING: case STRING:
return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes); return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes);
default: default:
throw new ISE("Cannot build aggregatorFactory for type[%s]", type); throw new ISE("Cannot build EARLIEST aggregatorFactory for type[%s]", type);
} }
} }
}, },
@ -100,7 +106,26 @@ public class EarliestLatestSqlAggregator implements SqlAggregator
case STRING: case STRING:
return new StringLastAggregatorFactory(name, fieldName, maxStringBytes); return new StringLastAggregatorFactory(name, fieldName, maxStringBytes);
default: default:
throw new ISE("Cannot build aggregatorFactory for type[%s]", type); throw new ISE("Cannot build LATEST aggregatorFactory for type[%s]", type);
}
}
},
ANY_VALUE {
@Override
AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueType type, int maxStringBytes)
{
switch (type) {
case LONG:
return new LongAnyAggregatorFactory(name, fieldName);
case FLOAT:
return new FloatAnyAggregatorFactory(name, fieldName);
case DOUBLE:
return new DoubleAnyAggregatorFactory(name, fieldName);
case STRING:
return new StringAnyAggregatorFactory(name, fieldName, maxStringBytes);
default:
throw new ISE("Cannot build ANY aggregatorFactory for type[%s]", type);
} }
} }
}; };
@ -113,13 +138,13 @@ public class EarliestLatestSqlAggregator implements SqlAggregator
); );
} }
private final EarliestOrLatest earliestOrLatest; private final AggregatorType aggregatorType;
private final SqlAggFunction function; private final SqlAggFunction function;
private EarliestLatestSqlAggregator(final EarliestOrLatest earliestOrLatest) private EarliestLatestAnySqlAggregator(final AggregatorType aggregatorType)
{ {
this.earliestOrLatest = earliestOrLatest; this.aggregatorType = aggregatorType;
this.function = new EarliestLatestSqlAggFunction(earliestOrLatest); this.function = new EarliestLatestSqlAggFunction(aggregatorType);
} }
@Override @Override
@ -183,7 +208,7 @@ public class EarliestLatestSqlAggregator implements SqlAggregator
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(Collectors.toList()), .collect(Collectors.toList()),
Collections.singletonList( Collections.singletonList(
earliestOrLatest.createAggregatorFactory( aggregatorType.createAggregatorFactory(
aggregatorName, aggregatorName,
fieldName, fieldName,
outputType, outputType,
@ -196,25 +221,27 @@ public class EarliestLatestSqlAggregator implements SqlAggregator
private static class EarliestLatestSqlAggFunction extends SqlAggFunction private static class EarliestLatestSqlAggFunction extends SqlAggFunction
{ {
EarliestLatestSqlAggFunction(EarliestOrLatest earliestOrLatest) EarliestLatestSqlAggFunction(AggregatorType aggregatorType)
{ {
super( super(
earliestOrLatest.name(), aggregatorType.name(),
null, null,
SqlKind.OTHER_FUNCTION, SqlKind.OTHER_FUNCTION,
ReturnTypes.ARG0, ReturnTypes.ARG0,
InferTypes.RETURN_TYPE, InferTypes.RETURN_TYPE,
OperandTypes.or( OperandTypes.or(
OperandTypes.or(OperandTypes.NUMERIC, OperandTypes.BOOLEAN), OperandTypes.NUMERIC,
OperandTypes.BOOLEAN,
OperandTypes.sequence( OperandTypes.sequence(
"'" + earliestOrLatest.name() + "(expr, maxBytesPerString)'\n", "'" + aggregatorType.name() + "(expr, maxBytesPerString)'\n",
OperandTypes.STRING, OperandTypes.STRING,
OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL) OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL)
) )
), ),
SqlFunctionCategory.STRING, SqlFunctionCategory.STRING,
false, false,
false false,
Optionality.FORBIDDEN
); );
} }
} }

View File

@ -36,7 +36,12 @@ import javax.annotation.Nullable;
import java.util.List; import java.util.List;
/** /**
* Abstraction for single column, single argument simple aggregators like sum, avg, min, max * Abstraction for single column, single argument simple aggregators like sum, avg, min, max that:
*
* 1) Can take direct field accesses or expressions as inputs.
* 2) Cannot implicitly cast strings to numbers when using a direct field access.
*
* @see Aggregations#getArgumentsForSimpleAggregator for details on these requirements
*/ */
public abstract class SimpleSqlAggregator implements SqlAggregator public abstract class SimpleSqlAggregator implements SqlAggregator
{ {

View File

@ -36,7 +36,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.ApproxCountDistinctSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.ApproxCountDistinctSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.AvgSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.AvgSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.CountSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.CountSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.EarliestLatestSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.EarliestLatestAnySqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.MaxSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.MaxSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.MinSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.MinSqlAggregator;
import org.apache.druid.sql.calcite.aggregation.builtin.SumSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.SumSqlAggregator;
@ -119,8 +119,9 @@ public class DruidOperatorTable implements SqlOperatorTable
.add(new ApproxCountDistinctSqlAggregator()) .add(new ApproxCountDistinctSqlAggregator())
.add(new AvgSqlAggregator()) .add(new AvgSqlAggregator())
.add(new CountSqlAggregator()) .add(new CountSqlAggregator())
.add(EarliestLatestSqlAggregator.EARLIEST) .add(EarliestLatestAnySqlAggregator.EARLIEST)
.add(EarliestLatestSqlAggregator.LATEST) .add(EarliestLatestAnySqlAggregator.LATEST)
.add(EarliestLatestAnySqlAggregator.ANY_VALUE)
.add(new MinSqlAggregator()) .add(new MinSqlAggregator())
.add(new MaxSqlAggregator()) .add(new MaxSqlAggregator())
.add(new SumSqlAggregator()) .add(new SumSqlAggregator())

View File

@ -43,6 +43,10 @@ import org.apache.druid.query.aggregation.FloatMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory;
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
@ -1297,6 +1301,118 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
// This test the on-heap version of the AnyAggregator (Double/Float/Long/String)
@Test
public void testAnyAggregator() throws Exception
{
// Cannot vectorize ANY aggregator.
skipVectorize();
testQuery(
"SELECT "
+ "ANY_VALUE(cnt), ANY_VALUE(m1), ANY_VALUE(m2), ANY_VALUE(dim1, 10), "
+ "ANY_VALUE(cnt + 1), ANY_VALUE(m1 + 1), ANY_VALUE(dim1 || CAST(cnt AS VARCHAR), 10) "
+ "FROM druid.foo",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.virtualColumns(
expressionVirtualColumn("v0", "(\"cnt\" + 1)", ValueType.LONG),
expressionVirtualColumn("v1", "(\"m1\" + 1)", ValueType.FLOAT),
expressionVirtualColumn("v2", "concat(\"dim1\",CAST(\"cnt\", 'STRING'))", ValueType.STRING)
)
.aggregators(
aggregators(
new LongAnyAggregatorFactory("a0", "cnt"),
new FloatAnyAggregatorFactory("a1", "m1"),
new DoubleAnyAggregatorFactory("a2", "m2"),
new StringAnyAggregatorFactory("a3", "dim1", 10),
new LongAnyAggregatorFactory("a4", "v0"),
new FloatAnyAggregatorFactory("a5", "v1"),
new StringAnyAggregatorFactory("a6", "v2", 10)
)
)
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{1L, 1.0f, 1.0, "", 2L, 2.0f, "1"}) : ImmutableList.of(new Object[]{1L, 1.0f, 1.0, "10.1", 2L, 2.0f, "1"})
);
}
// This test the on-heap version of the AnyAggregator (Double/Float/Long) against numeric columns
// that have null values (when run in sql compatible null mode)
@Test
public void testAnyAggregatorsOnHeapNumericNulls() throws Exception
{
// Cannot vectorize ANY aggregator.
skipVectorize();
testQuery(
"SELECT ANY_VALUE(l1), ANY_VALUE(d1), ANY_VALUE(f1) FROM druid.numfoo",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(
aggregators(
new LongAnyAggregatorFactory("a0", "l1"),
new DoubleAnyAggregatorFactory("a1", "d1"),
new FloatAnyAggregatorFactory("a2", "f1")
)
)
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{7L, 1.0, 1.0f}
)
);
}
// This test the off-heap (buffer) version of the AnyAggregator (Double/Float/Long) against numeric columns
// that have null values (when run in sql compatible null mode)
@Test
public void testAnyAggregatorsOffHeapNumericNulls() throws Exception
{
// Cannot vectorize ANY aggregator.
skipVectorize();
testQuery(
"SELECT ANY_VALUE(l1), ANY_VALUE(d1), ANY_VALUE(f1) FROM druid.numfoo GROUP BY dim2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE3)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "_d0")))
.setAggregatorSpecs(
aggregators(
new LongAnyAggregatorFactory("a0", "l1"),
new DoubleAnyAggregatorFactory("a1", "d1"),
new FloatAnyAggregatorFactory("a2", "f1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
NullHandling.sqlCompatible()
? ImmutableList.of(
new Object[]{325323L, 1.7, 0.1f},
new Object[]{0L, 0.0, 0.0f},
new Object[]{7L, 1.0, 1.0f},
new Object[]{null, null, null}
)
: ImmutableList.of(
new Object[]{325323L, 1.7, 0.1f},
new Object[]{7L, 1.0, 1.0f},
new Object[]{0L, 0.0, 0.0f}
)
);
}
@Test @Test
public void testLatestInSubquery() throws Exception public void testLatestInSubquery() throws Exception
{ {
@ -1334,6 +1450,92 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
// This test the off-heap (buffer) version of the AnyAggregator (Double/Float/Long)
@Test
public void testPrimitiveAnyInSubquery() throws Exception
{
// Cannot vectorize ANY aggregator.
skipVectorize();
testQuery(
"SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, ANY_VALUE(m1) AS val1, ANY_VALUE(cnt) AS val2, ANY_VALUE(m2) AS val3 FROM foo GROUP BY dim2)",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
.setAggregatorSpecs(aggregators(
new FloatAnyAggregatorFactory("a0:a", "m1"),
new LongAnyAggregatorFactory("a1:a", "cnt"),
new DoubleAnyAggregatorFactory("a2:a", "m2"))
)
.setPostAggregatorSpecs(
ImmutableList.of(
new FinalizingFieldAccessPostAggregator("a0", "a0:a"),
new FinalizingFieldAccessPostAggregator("a1", "a1:a"),
new FinalizingFieldAccessPostAggregator("a2", "a2:a")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(aggregators(
new DoubleSumAggregatorFactory("_a0", "a0"),
new LongSumAggregatorFactory("_a1", "a1"),
new DoubleSumAggregatorFactory("_a2", "a2")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{11.0, 4L, 11.0}) : ImmutableList.of(new Object[]{8.0, 3L, 8.0})
);
}
// This test the off-heap (buffer) version of the AnyAggregator (String)
@Test
public void testStringAnyInSubquery() throws Exception
{
// Cannot vectorize ANY aggregator.
skipVectorize();
testQuery(
"SELECT SUM(val) FROM (SELECT dim2, ANY_VALUE(dim1, 10) AS val FROM foo GROUP BY dim2)",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
.setAggregatorSpecs(aggregators(new StringAnyAggregatorFactory("a0:a", "dim1", 10)))
.setPostAggregatorSpecs(
ImmutableList.of(
new FinalizingFieldAccessPostAggregator("a0", "a0:a")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", ExprMacroTable.nil())))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{NullHandling.sqlCompatible() ? 12.1 : 11.1}
)
);
}
@Test @Test
public void testGroupByLong() throws Exception public void testGroupByLong() throws Exception
{ {