mirror of https://github.com/apache/druid.git
Add stringLast and stringFirst aggregators extension (#5789)
* Add lastString and firstString aggregators extension * Remove duplicated class * Move first-last-string doc page to extensions-contrib * Fix ObjectStrategy compare method * Fix doc bad aggregatos type name * Create FoldingAggregatorFactory classes to fix SegmentMetadataQuery * Add getMaxStringBytes() method to support JSON serialization * Fix null pointer exception at segment creation phase when the string value is null * Control the valueSelector object class on BufferAggregators * Perform all improvements * Add java doc on SerializablePairLongStringSerde * Refactor ObjectStraty compare method * Remove unused ; * Add aggregateCombiner unit tests. Rename BufferAggregators unit tests * Remove unused imports * Add license header * Add class name to java doc class serde * Throw exception if value is unsupported class type * Move first-last-string extension into druid core * Update druid core docs * Fix null pointer exception when pair->string is null * Add null control unit tests * Remove unused imports * Add first/last string folding aggregator on AggregatorsModule to support segment metadata query * Change SerializablePairLongString to extend SerializablePair * Change vars from public to private * Convert vars to primitive type * Clarify compare comment * Change IllegalStateException to ISE * Remove TODO comments * Control possible null pointer exception * Add @Nullable annotation * Remove empty line * Remove unused parameter type * Improve AggregatorCombiner javadocs * Add filterNullValues option at StringLast and StringFirst aggregators * Add filterNullValues option at agg documentation * Fix checkstyle * Update header license * Fix StringFirstAggregatorFactory.VALUE_COMPARATOR * Fix StringFirstAggregatorCombiner * Fix if condition at StringFirstAggregateCombiner * Remove filterNullValues from string first/last aggregators * Add isReset flag in FirstAggregatorCombiner * Change Arrays.asList to Collections.singletonList
This commit is contained in:
parent
297810e7a4
commit
e270362767
|
@ -102,7 +102,7 @@ Computes and stores the sum of values as 32-bit floating point value. Similar to
|
|||
|
||||
### First / Last aggregator
|
||||
|
||||
First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries.
|
||||
(Double/Float/Long) First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries.
|
||||
|
||||
Note that queries with first/last aggregators on a segment created with rollup enabled will return the rolled up value, and not the last value within the raw ingested data.
|
||||
|
||||
|
@ -178,6 +178,36 @@ Note that queries with first/last aggregators on a segment created with rollup e
|
|||
}
|
||||
```
|
||||
|
||||
#### `stringFirst` aggregator
|
||||
|
||||
`stringFirst` computes the metric value with the minimum timestamp or `null` if no row exist
|
||||
|
||||
```json
|
||||
{
|
||||
"type" : "stringFirst",
|
||||
"name" : <output_name>,
|
||||
"fieldName" : <metric_name>,
|
||||
"maxStringBytes" : <integer> # (optional, defaults to 1024),
|
||||
"filterNullValues" : <boolean> # (optional, defaults to false)
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### `stringLast` aggregator
|
||||
|
||||
`stringLast` computes the metric value with the maximum timestamp or `null` if no row exist
|
||||
|
||||
```json
|
||||
{
|
||||
"type" : "stringLast",
|
||||
"name" : <output_name>,
|
||||
"fieldName" : <metric_name>,
|
||||
"maxStringBytes" : <integer> # (optional, defaults to 1024),
|
||||
"filterNullValues" : <boolean> # (optional, defaults to false)
|
||||
}
|
||||
```
|
||||
|
||||
### JavaScript aggregator
|
||||
|
||||
Computes an arbitrary JavaScript function over a set of columns (both metrics and dimensions are allowed). Your
|
||||
|
|
|
@ -38,10 +38,13 @@ import io.druid.query.aggregation.LongMaxAggregatorFactory;
|
|||
import io.druid.query.aggregation.LongMinAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.aggregation.SerializablePairLongStringSerde;
|
||||
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
|
||||
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
|
||||
import io.druid.query.aggregation.first.FloatFirstAggregatorFactory;
|
||||
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
|
||||
import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
|
||||
import io.druid.query.aggregation.first.StringFirstFoldingAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
|
@ -49,6 +52,8 @@ import io.druid.query.aggregation.hyperloglog.PreComputedHyperUniquesSerde;
|
|||
import io.druid.query.aggregation.last.DoubleLastAggregatorFactory;
|
||||
import io.druid.query.aggregation.last.FloatLastAggregatorFactory;
|
||||
import io.druid.query.aggregation.last.LongLastAggregatorFactory;
|
||||
import io.druid.query.aggregation.last.StringLastAggregatorFactory;
|
||||
import io.druid.query.aggregation.last.StringLastFoldingAggregatorFactory;
|
||||
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
import io.druid.query.aggregation.post.ConstantPostAggregator;
|
||||
import io.druid.query.aggregation.post.DoubleGreatestPostAggregator;
|
||||
|
@ -74,7 +79,14 @@ public class AggregatorsModule extends SimpleModule
|
|||
}
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("preComputedHyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("preComputedHyperUnique", new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
ComplexMetrics.registerSerde(
|
||||
"preComputedHyperUnique",
|
||||
new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault())
|
||||
);
|
||||
}
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("serializablePairLongString") == null) {
|
||||
ComplexMetrics.registerSerde("serializablePairLongString", new SerializablePairLongStringSerde());
|
||||
}
|
||||
|
||||
setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
|
||||
|
@ -101,9 +113,13 @@ public class AggregatorsModule extends SimpleModule
|
|||
@JsonSubTypes.Type(name = "longFirst", value = LongFirstAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "doubleFirst", value = DoubleFirstAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "floatFirst", value = FloatFirstAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "stringFirst", value = StringFirstAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "stringFirstFold", value = StringFirstFoldingAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class)
|
||||
@JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "stringLast", value = StringLastAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "stringLastFold", value = StringLastFoldingAggregatorFactory.class)
|
||||
})
|
||||
public interface AggregatorFactoryMixin
|
||||
{
|
||||
|
|
|
@ -94,6 +94,10 @@ public class AggregatorUtil
|
|||
public static final byte ARRAY_OF_DOUBLES_SKETCH_T_TEST_CACHE_TYPE_ID = 0x29;
|
||||
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_STRING_CACHE_TYPE_ID = 0x2A;
|
||||
|
||||
// StringFirst, StringLast aggregator
|
||||
public static final byte STRING_FIRST_CACHE_TYPE_ID = 0x2B;
|
||||
public static final byte STRING_LAST_CACHE_TYPE_ID = 0x2C;
|
||||
|
||||
/**
|
||||
* returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
|
||||
*
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.collections.SerializablePair;
|
||||
|
||||
public class SerializablePairLongString extends SerializablePair<Long, String>
|
||||
{
|
||||
@JsonCreator
|
||||
public SerializablePairLongString(@JsonProperty("lhs") Long lhs, @JsonProperty("rhs") String rhs)
|
||||
{
|
||||
super(lhs, rhs);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,146 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation;
|
||||
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
|
||||
import io.druid.segment.GenericColumnSerializer;
|
||||
import io.druid.segment.column.ColumnBuilder;
|
||||
import io.druid.segment.data.GenericIndexed;
|
||||
import io.druid.segment.data.ObjectStrategy;
|
||||
import io.druid.segment.serde.ComplexColumnPartSupplier;
|
||||
import io.druid.segment.serde.ComplexMetricExtractor;
|
||||
import io.druid.segment.serde.ComplexMetricSerde;
|
||||
import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
|
||||
import io.druid.segment.writeout.SegmentWriteOutMedium;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* The SerializablePairLongStringSerde serializes a Long-String pair (SerializablePairLongString).
|
||||
* The serialization structure is: Long:Integer:String
|
||||
* <p>
|
||||
* The class is used on first/last String aggregators to store the time and the first/last string.
|
||||
* Long:Integer:String -> Timestamp:StringSize:StringData
|
||||
*/
|
||||
public class SerializablePairLongStringSerde extends ComplexMetricSerde
|
||||
{
|
||||
|
||||
private static final String TYPE_NAME = "serializablePairLongString";
|
||||
|
||||
@Override
|
||||
public String getTypeName()
|
||||
{
|
||||
return TYPE_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ComplexMetricExtractor getExtractor()
|
||||
{
|
||||
return new ComplexMetricExtractor()
|
||||
{
|
||||
@Override
|
||||
public Class<SerializablePairLongString> extractedClass()
|
||||
{
|
||||
return SerializablePairLongString.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object extractValue(InputRow inputRow, String metricName)
|
||||
{
|
||||
return inputRow.getRaw(metricName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder)
|
||||
{
|
||||
final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper());
|
||||
columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectStrategy getObjectStrategy()
|
||||
{
|
||||
return new ObjectStrategy<SerializablePairLongString>()
|
||||
{
|
||||
@Override
|
||||
public int compare(@Nullable SerializablePairLongString o1, @Nullable SerializablePairLongString o2)
|
||||
{
|
||||
return StringFirstAggregatorFactory.VALUE_COMPARATOR.compare(o1, o2);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends SerializablePairLongString> getClazz()
|
||||
{
|
||||
return SerializablePairLongString.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int numBytes)
|
||||
{
|
||||
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
|
||||
|
||||
long lhs = readOnlyBuffer.getLong();
|
||||
int stringSize = readOnlyBuffer.getInt();
|
||||
|
||||
String lastString = null;
|
||||
if (stringSize > 0) {
|
||||
byte[] stringBytes = new byte[stringSize];
|
||||
readOnlyBuffer.get(stringBytes, 0, stringSize);
|
||||
lastString = StringUtils.fromUtf8(stringBytes);
|
||||
}
|
||||
|
||||
return new SerializablePairLongString(lhs, lastString);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] toBytes(SerializablePairLongString val)
|
||||
{
|
||||
String rhsString = val.rhs;
|
||||
ByteBuffer bbuf;
|
||||
|
||||
if (rhsString != null) {
|
||||
byte[] rhsBytes = StringUtils.toUtf8(rhsString);
|
||||
bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + rhsBytes.length);
|
||||
bbuf.putLong(val.lhs);
|
||||
bbuf.putInt(Long.BYTES, rhsBytes.length);
|
||||
bbuf.position(Long.BYTES + Integer.BYTES);
|
||||
bbuf.put(rhsBytes);
|
||||
} else {
|
||||
bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
|
||||
bbuf.putLong(val.lhs);
|
||||
bbuf.putInt(Long.BYTES, 0);
|
||||
}
|
||||
|
||||
return bbuf.array();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
|
||||
{
|
||||
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
|
||||
}
|
||||
}
|
|
@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Longs;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.collections.SerializablePair;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.java.util.common.UOE;
|
||||
import io.druid.query.aggregation.AggregateCombiner;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.first;
|
||||
|
||||
import io.druid.query.aggregation.ObjectAggregateCombiner;
|
||||
import io.druid.segment.ColumnValueSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public class StringFirstAggregateCombiner extends ObjectAggregateCombiner<String>
|
||||
{
|
||||
private String firstString;
|
||||
private boolean isReset = false;
|
||||
|
||||
@Override
|
||||
public void reset(ColumnValueSelector selector)
|
||||
{
|
||||
firstString = (String) selector.getObject();
|
||||
isReset = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fold(ColumnValueSelector selector)
|
||||
{
|
||||
if (!isReset) {
|
||||
firstString = (String) selector.getObject();
|
||||
isReset = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getObject()
|
||||
{
|
||||
return firstString;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<String> classOfObject()
|
||||
{
|
||||
return String.class;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.first;
|
||||
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.segment.BaseLongColumnValueSelector;
|
||||
import io.druid.segment.BaseObjectColumnValueSelector;
|
||||
|
||||
public class StringFirstAggregator implements Aggregator
|
||||
{
|
||||
|
||||
private final BaseObjectColumnValueSelector valueSelector;
|
||||
private final BaseLongColumnValueSelector timeSelector;
|
||||
private final int maxStringBytes;
|
||||
|
||||
protected long firstTime;
|
||||
protected String firstValue;
|
||||
|
||||
public StringFirstAggregator(
|
||||
BaseLongColumnValueSelector timeSelector,
|
||||
BaseObjectColumnValueSelector valueSelector,
|
||||
int maxStringBytes
|
||||
)
|
||||
{
|
||||
this.valueSelector = valueSelector;
|
||||
this.timeSelector = timeSelector;
|
||||
this.maxStringBytes = maxStringBytes;
|
||||
|
||||
firstTime = Long.MAX_VALUE;
|
||||
firstValue = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate()
|
||||
{
|
||||
long time = timeSelector.getLong();
|
||||
if (time < firstTime) {
|
||||
firstTime = time;
|
||||
Object value = valueSelector.getObject();
|
||||
|
||||
if (value != null) {
|
||||
if (value instanceof String) {
|
||||
firstValue = (String) value;
|
||||
} else if (value instanceof SerializablePairLongString) {
|
||||
firstValue = ((SerializablePairLongString) value).rhs;
|
||||
} else {
|
||||
throw new ISE(
|
||||
"Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString",
|
||||
value.getClass().getCanonicalName()
|
||||
);
|
||||
}
|
||||
|
||||
if (firstValue != null && firstValue.length() > maxStringBytes) {
|
||||
firstValue = firstValue.substring(0, maxStringBytes);
|
||||
}
|
||||
} else {
|
||||
firstValue = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return new SerializablePairLongString(firstTime, firstValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat()
|
||||
{
|
||||
throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLong()
|
||||
{
|
||||
throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getDouble()
|
||||
{
|
||||
throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// no resources to cleanup
|
||||
}
|
||||
}
|
|
@ -0,0 +1,248 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.first;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Longs;
|
||||
import io.druid.query.aggregation.AggregateCombiner;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.AggregatorUtil;
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.column.Column;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
@JsonTypeName("stringFirst")
|
||||
public class StringFirstAggregatorFactory extends AggregatorFactory
|
||||
{
|
||||
public static final int DEFAULT_MAX_STRING_SIZE = 1024;
|
||||
|
||||
public static final Comparator TIME_COMPARATOR = (o1, o2) -> Longs.compare(
|
||||
((SerializablePairLongString) o1).lhs,
|
||||
((SerializablePairLongString) o2).lhs
|
||||
);
|
||||
|
||||
public static final Comparator<SerializablePairLongString> VALUE_COMPARATOR = (o1, o2) -> {
|
||||
int comparation;
|
||||
|
||||
// First we check if the objects are null
|
||||
if (o1 == null && o2 == null) {
|
||||
comparation = 0;
|
||||
} else if (o1 == null) {
|
||||
comparation = -1;
|
||||
} else if (o2 == null) {
|
||||
comparation = 1;
|
||||
} else {
|
||||
|
||||
// If the objects are not null, we will try to compare using timestamp
|
||||
comparation = o1.lhs.compareTo(o2.lhs);
|
||||
|
||||
// If both timestamp are the same, we try to compare the Strings
|
||||
if (comparation == 0) {
|
||||
|
||||
// First we check if the strings are null
|
||||
if (o1.rhs == null && o2.rhs == null) {
|
||||
comparation = 0;
|
||||
} else if (o1.rhs == null) {
|
||||
comparation = -1;
|
||||
} else if (o2.rhs == null) {
|
||||
comparation = 1;
|
||||
} else {
|
||||
|
||||
// If the strings are not null, we will compare them
|
||||
// Note: This comparation maybe doesn't make sense to first/last aggregators
|
||||
comparation = o1.rhs.compareTo(o2.rhs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return comparation;
|
||||
};
|
||||
|
||||
private final String fieldName;
|
||||
private final String name;
|
||||
protected final int maxStringBytes;
|
||||
|
||||
@JsonCreator
|
||||
public StringFirstAggregatorFactory(
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("fieldName") final String fieldName,
|
||||
@JsonProperty("maxStringBytes") Integer maxStringBytes
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
|
||||
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.maxStringBytes = maxStringBytes == null ? DEFAULT_MAX_STRING_SIZE : maxStringBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new StringFirstAggregator(
|
||||
metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
|
||||
metricFactory.makeColumnValueSelector(fieldName),
|
||||
maxStringBytes
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new StringFirstBufferAggregator(
|
||||
metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
|
||||
metricFactory.makeColumnValueSelector(fieldName),
|
||||
maxStringBytes
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator()
|
||||
{
|
||||
return VALUE_COMPARATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object combine(Object lhs, Object rhs)
|
||||
{
|
||||
return TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregateCombiner makeAggregateCombiner()
|
||||
{
|
||||
return new StringFirstAggregateCombiner();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory getCombiningFactory()
|
||||
{
|
||||
return new StringFirstFoldingAggregatorFactory(name, name, maxStringBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Collections.singletonList(new StringFirstAggregatorFactory(fieldName, fieldName, maxStringBytes));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
Map map = (Map) object;
|
||||
return new SerializablePairLongString(((Number) map.get("lhs")).longValue(), ((String) map.get("rhs")));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object finalizeComputation(Object object)
|
||||
{
|
||||
return ((SerializablePairLongString) object).rhs;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getFieldName()
|
||||
{
|
||||
return fieldName;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Integer getMaxStringBytes()
|
||||
{
|
||||
return maxStringBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> requiredFields()
|
||||
{
|
||||
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(AggregatorUtil.STRING_FIRST_CACHE_TYPE_ID)
|
||||
.appendString(fieldName)
|
||||
.appendInt(maxStringBytes)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTypeName()
|
||||
{
|
||||
return "serializablePairLongString";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxIntermediateSize()
|
||||
{
|
||||
return Long.BYTES + Integer.BYTES + maxStringBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
StringFirstAggregatorFactory that = (StringFirstAggregatorFactory) o;
|
||||
|
||||
return fieldName.equals(that.fieldName) && name.equals(that.name) && maxStringBytes == that.maxStringBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(name, fieldName, maxStringBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "StringFirstAggregatorFactory{" +
|
||||
"name='" + name + '\'' +
|
||||
", fieldName='" + fieldName + '\'' +
|
||||
", maxStringBytes=" + maxStringBytes + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.first;
|
||||
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
|
||||
import io.druid.segment.BaseLongColumnValueSelector;
|
||||
import io.druid.segment.BaseObjectColumnValueSelector;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class StringFirstBufferAggregator implements BufferAggregator
|
||||
{
|
||||
private final BaseLongColumnValueSelector timeSelector;
|
||||
private final BaseObjectColumnValueSelector valueSelector;
|
||||
private final int maxStringBytes;
|
||||
|
||||
public StringFirstBufferAggregator(
|
||||
BaseLongColumnValueSelector timeSelector,
|
||||
BaseObjectColumnValueSelector valueSelector,
|
||||
int maxStringBytes
|
||||
)
|
||||
{
|
||||
this.timeSelector = timeSelector;
|
||||
this.valueSelector = valueSelector;
|
||||
this.maxStringBytes = maxStringBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.putLong(position, Long.MAX_VALUE);
|
||||
buf.putInt(position + Long.BYTES, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position)
|
||||
{
|
||||
ByteBuffer mutationBuffer = buf.duplicate();
|
||||
mutationBuffer.position(position);
|
||||
|
||||
Object value = valueSelector.getObject();
|
||||
|
||||
long time = timeSelector.getLong();
|
||||
String firstString = null;
|
||||
|
||||
if (value != null) {
|
||||
if (value instanceof SerializablePairLongString) {
|
||||
SerializablePairLongString serializablePair = (SerializablePairLongString) value;
|
||||
time = serializablePair.lhs;
|
||||
firstString = serializablePair.rhs;
|
||||
} else if (value instanceof String) {
|
||||
firstString = (String) value;
|
||||
} else {
|
||||
throw new ISE(
|
||||
"Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString",
|
||||
value.getClass().getCanonicalName()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
long lastTime = mutationBuffer.getLong(position);
|
||||
|
||||
if (time < lastTime) {
|
||||
if (firstString != null) {
|
||||
if (firstString.length() > maxStringBytes) {
|
||||
firstString = firstString.substring(0, maxStringBytes);
|
||||
}
|
||||
|
||||
byte[] valueBytes = StringUtils.toUtf8(firstString);
|
||||
|
||||
mutationBuffer.putLong(position, time);
|
||||
mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
|
||||
|
||||
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
|
||||
mutationBuffer.put(valueBytes);
|
||||
} else {
|
||||
mutationBuffer.putLong(position, time);
|
||||
mutationBuffer.putInt(position + Long.BYTES, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
ByteBuffer mutationBuffer = buf.duplicate();
|
||||
mutationBuffer.position(position);
|
||||
|
||||
Long timeValue = mutationBuffer.getLong(position);
|
||||
int stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES);
|
||||
|
||||
SerializablePairLongString serializablePair;
|
||||
|
||||
if (stringSizeBytes > 0) {
|
||||
byte[] valueBytes = new byte[stringSizeBytes];
|
||||
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
|
||||
mutationBuffer.get(valueBytes, 0, stringSizeBytes);
|
||||
serializablePair = new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes));
|
||||
} else {
|
||||
serializablePair = new SerializablePairLongString(timeValue, null);
|
||||
}
|
||||
|
||||
return serializablePair;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat(ByteBuffer buf, int position)
|
||||
{
|
||||
throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getDouble(ByteBuffer buf, int position)
|
||||
{
|
||||
throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// no resources to cleanup
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
|
||||
{
|
||||
inspector.visit("timeSelector", timeSelector);
|
||||
inspector.visit("valueSelector", valueSelector);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.first;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
|
||||
import io.druid.segment.BaseObjectColumnValueSelector;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@JsonTypeName("stringFirstFold")
|
||||
public class StringFirstFoldingAggregatorFactory extends StringFirstAggregatorFactory
|
||||
{
|
||||
public StringFirstFoldingAggregatorFactory(
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("fieldName") final String fieldName,
|
||||
@JsonProperty("maxStringBytes") Integer maxStringBytes
|
||||
)
|
||||
{
|
||||
super(name, fieldName, maxStringBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
final BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(getName());
|
||||
return new StringFirstAggregator(null, null, maxStringBytes)
|
||||
{
|
||||
@Override
|
||||
public void aggregate()
|
||||
{
|
||||
SerializablePairLongString pair = (SerializablePairLongString) selector.getObject();
|
||||
if (pair != null && pair.lhs < firstTime) {
|
||||
firstTime = pair.lhs;
|
||||
firstValue = pair.rhs;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
final BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(getName());
|
||||
return new StringFirstBufferAggregator(null, null, maxStringBytes)
|
||||
{
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position)
|
||||
{
|
||||
SerializablePairLongString pair = (SerializablePairLongString) selector.getObject();
|
||||
|
||||
if (pair != null && pair.lhs != null) {
|
||||
ByteBuffer mutationBuffer = buf.duplicate();
|
||||
mutationBuffer.position(position);
|
||||
|
||||
long lastTime = mutationBuffer.getLong(position);
|
||||
|
||||
if (pair.lhs < lastTime) {
|
||||
mutationBuffer.putLong(position, pair.lhs);
|
||||
|
||||
if (pair.rhs != null) {
|
||||
byte[] valueBytes = StringUtils.toUtf8(pair.rhs);
|
||||
|
||||
mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
|
||||
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
|
||||
mutationBuffer.put(valueBytes);
|
||||
} else {
|
||||
mutationBuffer.putInt(position + Long.BYTES, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
|
||||
{
|
||||
inspector.visit("selector", selector);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
|
@ -22,8 +22,8 @@ package io.druid.query.aggregation.last;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.collections.SerializablePair;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.java.util.common.UOE;
|
||||
import io.druid.query.aggregation.AggregateCombiner;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.last;
|
||||
|
||||
import io.druid.query.aggregation.ObjectAggregateCombiner;
|
||||
import io.druid.segment.ColumnValueSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public class StringLastAggregateCombiner extends ObjectAggregateCombiner<String>
|
||||
{
|
||||
private String lastString;
|
||||
|
||||
@Override
|
||||
public void reset(ColumnValueSelector selector)
|
||||
{
|
||||
lastString = (String) selector.getObject();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fold(ColumnValueSelector selector)
|
||||
{
|
||||
lastString = (String) selector.getObject();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getObject()
|
||||
{
|
||||
return lastString;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<String> classOfObject()
|
||||
{
|
||||
return String.class;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.last;
|
||||
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.segment.BaseLongColumnValueSelector;
|
||||
import io.druid.segment.BaseObjectColumnValueSelector;
|
||||
|
||||
public class StringLastAggregator implements Aggregator
|
||||
{
|
||||
|
||||
private final BaseObjectColumnValueSelector valueSelector;
|
||||
private final BaseLongColumnValueSelector timeSelector;
|
||||
private final int maxStringBytes;
|
||||
|
||||
protected long lastTime;
|
||||
protected String lastValue;
|
||||
|
||||
public StringLastAggregator(
|
||||
BaseLongColumnValueSelector timeSelector,
|
||||
BaseObjectColumnValueSelector valueSelector,
|
||||
int maxStringBytes
|
||||
)
|
||||
{
|
||||
this.valueSelector = valueSelector;
|
||||
this.timeSelector = timeSelector;
|
||||
this.maxStringBytes = maxStringBytes;
|
||||
|
||||
lastTime = Long.MIN_VALUE;
|
||||
lastValue = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate()
|
||||
{
|
||||
long time = timeSelector.getLong();
|
||||
if (time >= lastTime) {
|
||||
lastTime = time;
|
||||
Object value = valueSelector.getObject();
|
||||
|
||||
if (value != null) {
|
||||
if (value instanceof String) {
|
||||
lastValue = (String) value;
|
||||
} else if (value instanceof SerializablePairLongString) {
|
||||
lastValue = ((SerializablePairLongString) value).rhs;
|
||||
} else {
|
||||
throw new ISE(
|
||||
"Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString",
|
||||
value.getClass().getCanonicalName()
|
||||
);
|
||||
}
|
||||
|
||||
if (lastValue != null && lastValue.length() > maxStringBytes) {
|
||||
lastValue = lastValue.substring(0, maxStringBytes);
|
||||
}
|
||||
} else {
|
||||
lastValue = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return new SerializablePairLongString(lastTime, lastValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat()
|
||||
{
|
||||
throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLong()
|
||||
{
|
||||
throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getDouble()
|
||||
{
|
||||
throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// no resources to cleanup
|
||||
}
|
||||
}
|
|
@ -0,0 +1,207 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.last;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.druid.query.aggregation.AggregateCombiner;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.AggregatorUtil;
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.column.Column;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
|
||||
@JsonTypeName("stringLast")
|
||||
public class StringLastAggregatorFactory extends AggregatorFactory
|
||||
{
|
||||
private final String fieldName;
|
||||
private final String name;
|
||||
protected final int maxStringBytes;
|
||||
|
||||
@JsonCreator
|
||||
public StringLastAggregatorFactory(
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("fieldName") final String fieldName,
|
||||
@JsonProperty("maxStringBytes") Integer maxStringBytes
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
|
||||
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.maxStringBytes = maxStringBytes == null
|
||||
? StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE
|
||||
: maxStringBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new StringLastAggregator(
|
||||
metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
|
||||
metricFactory.makeColumnValueSelector(fieldName),
|
||||
maxStringBytes
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new StringLastBufferAggregator(
|
||||
metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
|
||||
metricFactory.makeColumnValueSelector(fieldName),
|
||||
maxStringBytes
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator()
|
||||
{
|
||||
return StringFirstAggregatorFactory.VALUE_COMPARATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object combine(Object lhs, Object rhs)
|
||||
{
|
||||
return StringFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregateCombiner makeAggregateCombiner()
|
||||
{
|
||||
return new StringLastAggregateCombiner();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory getCombiningFactory()
|
||||
{
|
||||
return new StringLastFoldingAggregatorFactory(name, name, maxStringBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Collections.singletonList(new StringLastAggregatorFactory(fieldName, fieldName, maxStringBytes));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
Map map = (Map) object;
|
||||
return new SerializablePairLongString(((Number) map.get("lhs")).longValue(), ((String) map.get("rhs")));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object finalizeComputation(Object object)
|
||||
{
|
||||
return ((SerializablePairLongString) object).rhs;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getFieldName()
|
||||
{
|
||||
return fieldName;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Integer getMaxStringBytes()
|
||||
{
|
||||
return maxStringBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> requiredFields()
|
||||
{
|
||||
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
return new CacheKeyBuilder(AggregatorUtil.STRING_LAST_CACHE_TYPE_ID)
|
||||
.appendString(fieldName)
|
||||
.appendInt(maxStringBytes)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTypeName()
|
||||
{
|
||||
return "serializablePairLongString";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxIntermediateSize()
|
||||
{
|
||||
return Long.BYTES + Integer.BYTES + maxStringBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
StringLastAggregatorFactory that = (StringLastAggregatorFactory) o;
|
||||
|
||||
return fieldName.equals(that.fieldName) && name.equals(that.name) && maxStringBytes == that.maxStringBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(name, fieldName, maxStringBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "StringFirstAggregatorFactory{" +
|
||||
"name='" + name + '\'' +
|
||||
", fieldName='" + fieldName + '\'' +
|
||||
", maxStringBytes=" + maxStringBytes + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.last;
|
||||
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
|
||||
import io.druid.segment.BaseLongColumnValueSelector;
|
||||
import io.druid.segment.BaseObjectColumnValueSelector;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class StringLastBufferAggregator implements BufferAggregator
|
||||
{
|
||||
private final BaseLongColumnValueSelector timeSelector;
|
||||
private final BaseObjectColumnValueSelector valueSelector;
|
||||
private final int maxStringBytes;
|
||||
|
||||
public StringLastBufferAggregator(
|
||||
BaseLongColumnValueSelector timeSelector,
|
||||
BaseObjectColumnValueSelector valueSelector,
|
||||
int maxStringBytes
|
||||
)
|
||||
{
|
||||
this.timeSelector = timeSelector;
|
||||
this.valueSelector = valueSelector;
|
||||
this.maxStringBytes = maxStringBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.putLong(position, Long.MIN_VALUE);
|
||||
buf.putInt(position + Long.BYTES, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position)
|
||||
{
|
||||
ByteBuffer mutationBuffer = buf.duplicate();
|
||||
mutationBuffer.position(position);
|
||||
|
||||
Object value = valueSelector.getObject();
|
||||
|
||||
long time = timeSelector.getLong();
|
||||
String lastString = null;
|
||||
|
||||
if (value != null) {
|
||||
if (value instanceof SerializablePairLongString) {
|
||||
SerializablePairLongString serializablePair = (SerializablePairLongString) value;
|
||||
time = serializablePair.lhs;
|
||||
lastString = serializablePair.rhs;
|
||||
} else if (value instanceof String) {
|
||||
lastString = (String) value;
|
||||
} else {
|
||||
throw new ISE(
|
||||
"Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString",
|
||||
value.getClass().getCanonicalName()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
long lastTime = mutationBuffer.getLong(position);
|
||||
|
||||
if (time >= lastTime) {
|
||||
if (lastString != null) {
|
||||
if (lastString.length() > maxStringBytes) {
|
||||
lastString = lastString.substring(0, maxStringBytes);
|
||||
}
|
||||
|
||||
byte[] valueBytes = StringUtils.toUtf8(lastString);
|
||||
|
||||
mutationBuffer.putLong(position, time);
|
||||
mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
|
||||
|
||||
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
|
||||
mutationBuffer.put(valueBytes);
|
||||
} else {
|
||||
mutationBuffer.putLong(position, time);
|
||||
mutationBuffer.putInt(position + Long.BYTES, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
ByteBuffer mutationBuffer = buf.duplicate();
|
||||
mutationBuffer.position(position);
|
||||
|
||||
Long timeValue = mutationBuffer.getLong(position);
|
||||
Integer stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES);
|
||||
|
||||
SerializablePairLongString serializablePair;
|
||||
|
||||
if (stringSizeBytes > 0) {
|
||||
byte[] valueBytes = new byte[stringSizeBytes];
|
||||
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
|
||||
mutationBuffer.get(valueBytes, 0, stringSizeBytes);
|
||||
serializablePair = new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes));
|
||||
} else {
|
||||
serializablePair = new SerializablePairLongString(timeValue, null);
|
||||
}
|
||||
|
||||
return serializablePair;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat(ByteBuffer buf, int position)
|
||||
{
|
||||
throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getDouble(ByteBuffer buf, int position)
|
||||
{
|
||||
throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// no resources to cleanup
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
|
||||
{
|
||||
inspector.visit("timeSelector", timeSelector);
|
||||
inspector.visit("valueSelector", valueSelector);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.last;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
|
||||
import io.druid.segment.BaseObjectColumnValueSelector;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@JsonTypeName("stringLastFold")
|
||||
public class StringLastFoldingAggregatorFactory extends StringLastAggregatorFactory
|
||||
{
|
||||
public StringLastFoldingAggregatorFactory(
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("fieldName") final String fieldName,
|
||||
@JsonProperty("maxStringBytes") Integer maxStringBytes
|
||||
)
|
||||
{
|
||||
super(name, fieldName, maxStringBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
final BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(getName());
|
||||
return new StringLastAggregator(null, null, maxStringBytes)
|
||||
{
|
||||
@Override
|
||||
public void aggregate()
|
||||
{
|
||||
SerializablePairLongString pair = (SerializablePairLongString) selector.getObject();
|
||||
if (pair != null && pair.lhs >= lastTime) {
|
||||
lastTime = pair.lhs;
|
||||
lastValue = pair.rhs;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
final BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(getName());
|
||||
return new StringLastBufferAggregator(null, null, maxStringBytes)
|
||||
{
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position)
|
||||
{
|
||||
SerializablePairLongString pair = (SerializablePairLongString) selector.getObject();
|
||||
if (pair != null && pair.lhs != null) {
|
||||
ByteBuffer mutationBuffer = buf.duplicate();
|
||||
mutationBuffer.position(position);
|
||||
|
||||
long lastTime = mutationBuffer.getLong(position);
|
||||
|
||||
if (pair.lhs >= lastTime) {
|
||||
mutationBuffer.putLong(position, pair.lhs);
|
||||
if (pair.rhs != null) {
|
||||
byte[] valueBytes = StringUtils.toUtf8(pair.rhs);
|
||||
|
||||
mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
|
||||
mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
|
||||
mutationBuffer.put(valueBytes);
|
||||
} else {
|
||||
mutationBuffer.putInt(position + Long.BYTES, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
|
||||
{
|
||||
inspector.visit("selector", selector);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,194 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.first;
|
||||
|
||||
import io.druid.java.util.common.Pair;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.query.aggregation.TestLongColumnSelector;
|
||||
import io.druid.query.aggregation.TestObjectColumnSelector;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.column.Column;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class StringFirstAggregationTest
|
||||
{
|
||||
private final Integer MAX_STRING_SIZE = 1024;
|
||||
private StringFirstAggregatorFactory stringLastAggFactory;
|
||||
private StringFirstAggregatorFactory combiningAggFactory;
|
||||
private ColumnSelectorFactory colSelectorFactory;
|
||||
private TestLongColumnSelector timeSelector;
|
||||
private TestObjectColumnSelector<String> valueSelector;
|
||||
private TestObjectColumnSelector objectSelector;
|
||||
|
||||
private String[] strings = {"1111", "2222", "3333", null, "4444"};
|
||||
private long[] times = {8224, 6879, 2436, 3546, 7888};
|
||||
private SerializablePairLongString[] pairs = {
|
||||
new SerializablePairLongString(52782L, "AAAA"),
|
||||
new SerializablePairLongString(65492L, "BBBB"),
|
||||
new SerializablePairLongString(69134L, "CCCC"),
|
||||
new SerializablePairLongString(11111L, "DDDD"),
|
||||
new SerializablePairLongString(51223L, null)
|
||||
};
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
stringLastAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE);
|
||||
combiningAggFactory = (StringFirstAggregatorFactory) stringLastAggFactory.getCombiningFactory();
|
||||
timeSelector = new TestLongColumnSelector(times);
|
||||
valueSelector = new TestObjectColumnSelector<>(strings);
|
||||
objectSelector = new TestObjectColumnSelector<>(pairs);
|
||||
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
|
||||
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
|
||||
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
|
||||
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
|
||||
EasyMock.replay(colSelectorFactory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringLastAggregator()
|
||||
{
|
||||
StringFirstAggregator agg = (StringFirstAggregator) stringLastAggFactory.factorize(colSelectorFactory);
|
||||
|
||||
aggregate(agg);
|
||||
aggregate(agg);
|
||||
aggregate(agg);
|
||||
aggregate(agg);
|
||||
|
||||
Pair<Long, String> result = (Pair<Long, String>) agg.get();
|
||||
|
||||
Assert.assertEquals(strings[2], result.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringLastBufferAggregator()
|
||||
{
|
||||
StringFirstBufferAggregator agg = (StringFirstBufferAggregator) stringLastAggFactory.factorizeBuffered(
|
||||
colSelectorFactory);
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]);
|
||||
agg.init(buffer, 0);
|
||||
|
||||
aggregate(agg, buffer, 0);
|
||||
aggregate(agg, buffer, 0);
|
||||
aggregate(agg, buffer, 0);
|
||||
aggregate(agg, buffer, 0);
|
||||
|
||||
Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
|
||||
|
||||
Assert.assertEquals(strings[2], result.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCombine()
|
||||
{
|
||||
SerializablePairLongString pair1 = new SerializablePairLongString(1467225000L, "AAAA");
|
||||
SerializablePairLongString pair2 = new SerializablePairLongString(1467240000L, "BBBB");
|
||||
Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringLastCombiningAggregator()
|
||||
{
|
||||
StringFirstAggregator agg = (StringFirstAggregator) combiningAggFactory.factorize(colSelectorFactory);
|
||||
|
||||
aggregate(agg);
|
||||
aggregate(agg);
|
||||
aggregate(agg);
|
||||
aggregate(agg);
|
||||
|
||||
Pair<Long, String> result = (Pair<Long, String>) agg.get();
|
||||
Pair<Long, String> expected = pairs[3];
|
||||
|
||||
Assert.assertEquals(expected.lhs, result.lhs);
|
||||
Assert.assertEquals(expected.rhs, result.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringFirstCombiningBufferAggregator()
|
||||
{
|
||||
StringFirstBufferAggregator agg = (StringFirstBufferAggregator) combiningAggFactory.factorizeBuffered(
|
||||
colSelectorFactory);
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]);
|
||||
agg.init(buffer, 0);
|
||||
|
||||
aggregate(agg, buffer, 0);
|
||||
aggregate(agg, buffer, 0);
|
||||
aggregate(agg, buffer, 0);
|
||||
aggregate(agg, buffer, 0);
|
||||
|
||||
Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
|
||||
Pair<Long, String> expected = pairs[3];
|
||||
|
||||
Assert.assertEquals(expected.lhs, result.lhs);
|
||||
Assert.assertEquals(expected.rhs, result.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringFirstAggregateCombiner()
|
||||
{
|
||||
final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
|
||||
TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(strings);
|
||||
|
||||
StringFirstAggregateCombiner stringFirstAggregateCombiner =
|
||||
(StringFirstAggregateCombiner) combiningAggFactory.makeAggregateCombiner();
|
||||
|
||||
stringFirstAggregateCombiner.reset(columnSelector);
|
||||
|
||||
Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
|
||||
|
||||
columnSelector.increment();
|
||||
stringFirstAggregateCombiner.fold(columnSelector);
|
||||
|
||||
Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
|
||||
|
||||
stringFirstAggregateCombiner.reset(columnSelector);
|
||||
|
||||
Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
|
||||
}
|
||||
|
||||
private void aggregate(
|
||||
StringFirstAggregator agg
|
||||
)
|
||||
{
|
||||
agg.aggregate();
|
||||
timeSelector.increment();
|
||||
valueSelector.increment();
|
||||
objectSelector.increment();
|
||||
}
|
||||
|
||||
private void aggregate(
|
||||
StringFirstBufferAggregator agg,
|
||||
ByteBuffer buff,
|
||||
int position
|
||||
)
|
||||
{
|
||||
agg.aggregate(buff, position);
|
||||
timeSelector.increment();
|
||||
valueSelector.increment();
|
||||
objectSelector.increment();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,171 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.first;
|
||||
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.query.aggregation.TestLongColumnSelector;
|
||||
import io.druid.query.aggregation.TestObjectColumnSelector;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
public class StringFirstBufferAggregatorTest
|
||||
{
|
||||
private void aggregateBuffer(
|
||||
TestLongColumnSelector timeSelector,
|
||||
TestObjectColumnSelector valueSelector,
|
||||
BufferAggregator agg,
|
||||
ByteBuffer buf,
|
||||
int position
|
||||
)
|
||||
{
|
||||
agg.aggregate(buf, position);
|
||||
timeSelector.increment();
|
||||
valueSelector.increment();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBufferAggregate() throws Exception
|
||||
{
|
||||
|
||||
final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L};
|
||||
final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
|
||||
Integer maxStringBytes = 1024;
|
||||
|
||||
TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
|
||||
TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
|
||||
|
||||
StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
|
||||
"billy", "billy", maxStringBytes
|
||||
);
|
||||
|
||||
StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
|
||||
longColumnSelector,
|
||||
objectColumnSelector,
|
||||
maxStringBytes
|
||||
);
|
||||
|
||||
String testString = "ZZZZ";
|
||||
|
||||
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
|
||||
buf.putLong(1526728500L);
|
||||
buf.putInt(testString.length());
|
||||
buf.put(testString.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
int position = 0;
|
||||
|
||||
agg.init(buf, position);
|
||||
//noinspection ForLoopReplaceableByForEach
|
||||
for (int i = 0; i < timestamps.length; i++) {
|
||||
aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
|
||||
}
|
||||
|
||||
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
|
||||
|
||||
|
||||
Assert.assertEquals("expectec last string value", strings[0], sp.rhs);
|
||||
Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullBufferAggregate() throws Exception
|
||||
{
|
||||
|
||||
final long[] timestamps = {2222L, 1111L, 3333L, 4444L, 5555L};
|
||||
final String[] strings = {null, "AAAA", "BBBB", "DDDD", "EEEE"};
|
||||
Integer maxStringBytes = 1024;
|
||||
|
||||
TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
|
||||
TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
|
||||
|
||||
StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
|
||||
"billy", "billy", maxStringBytes
|
||||
);
|
||||
|
||||
StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
|
||||
longColumnSelector,
|
||||
objectColumnSelector,
|
||||
maxStringBytes
|
||||
);
|
||||
|
||||
String testString = "ZZZZ";
|
||||
|
||||
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
|
||||
buf.putLong(1526728500L);
|
||||
buf.putInt(testString.length());
|
||||
buf.put(testString.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
int position = 0;
|
||||
|
||||
agg.init(buf, position);
|
||||
//noinspection ForLoopReplaceableByForEach
|
||||
for (int i = 0; i < timestamps.length; i++) {
|
||||
aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
|
||||
}
|
||||
|
||||
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
|
||||
|
||||
|
||||
Assert.assertEquals("expectec last string value", strings[1], sp.rhs);
|
||||
Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[1]), new Long(sp.lhs));
|
||||
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testNoStringValue()
|
||||
{
|
||||
|
||||
final long[] timestamps = {1526724000L, 1526724600L};
|
||||
final Double[] doubles = {null, 2.00};
|
||||
Integer maxStringBytes = 1024;
|
||||
|
||||
TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
|
||||
TestObjectColumnSelector<Double> objectColumnSelector = new TestObjectColumnSelector<>(doubles);
|
||||
|
||||
StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
|
||||
"billy", "billy", maxStringBytes
|
||||
);
|
||||
|
||||
StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
|
||||
longColumnSelector,
|
||||
objectColumnSelector,
|
||||
maxStringBytes
|
||||
);
|
||||
|
||||
String testString = "ZZZZ";
|
||||
|
||||
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
|
||||
buf.putLong(1526728500L);
|
||||
buf.putInt(testString.length());
|
||||
buf.put(testString.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
int position = 0;
|
||||
|
||||
agg.init(buf, position);
|
||||
//noinspection ForLoopReplaceableByForEach
|
||||
for (int i = 0; i < timestamps.length; i++) {
|
||||
aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.first;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.java.util.common.DateTimes;
|
||||
import io.druid.java.util.common.granularity.Granularities;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.query.timeseries.TimeseriesQuery;
|
||||
import io.druid.query.timeseries.TimeseriesQueryEngine;
|
||||
import io.druid.query.timeseries.TimeseriesResultValue;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class StringFirstTimeseriesQueryTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testTopNWithDistinctCountAgg() throws Exception
|
||||
{
|
||||
TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
|
||||
|
||||
String visitor_id = "visitor_id";
|
||||
String client_type = "client_type";
|
||||
|
||||
IncrementalIndex index = new IncrementalIndex.Builder()
|
||||
.setIndexSchema(
|
||||
new IncrementalIndexSchema.Builder()
|
||||
.withQueryGranularity(Granularities.SECOND)
|
||||
.withMetrics(new CountAggregatorFactory("cnt"))
|
||||
.withMetrics(new StringFirstAggregatorFactory(
|
||||
"last_client_type", "client_type", 1024)
|
||||
)
|
||||
.build()
|
||||
)
|
||||
.setMaxRowCount(1000)
|
||||
.buildOnheap();
|
||||
|
||||
|
||||
DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z");
|
||||
long timestamp = time.getMillis();
|
||||
|
||||
DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z");
|
||||
long timestamp1 = time1.getMillis();
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "iphone")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "1", client_type, "iphone")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp1,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "android")
|
||||
)
|
||||
);
|
||||
|
||||
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.aggregators(
|
||||
Lists.newArrayList(
|
||||
new StringFirstAggregatorFactory(
|
||||
"last_client_type", client_type, 1024
|
||||
)
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
final Iterable<Result<TimeseriesResultValue>> results =
|
||||
engine.process(query, new IncrementalIndexStorageAdapter(index)).toList();
|
||||
|
||||
List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList(
|
||||
new Result<>(
|
||||
time,
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.<String, Object>of("last_client_type", new SerializablePairLongString(timestamp, "iphone"))
|
||||
)
|
||||
)
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,194 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.last;
|
||||
|
||||
import io.druid.java.util.common.Pair;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.query.aggregation.TestLongColumnSelector;
|
||||
import io.druid.query.aggregation.TestObjectColumnSelector;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.column.Column;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class StringLastAggregationTest
|
||||
{
|
||||
private final Integer MAX_STRING_SIZE = 1024;
|
||||
private StringLastAggregatorFactory stringLastAggFactory;
|
||||
private StringLastAggregatorFactory combiningAggFactory;
|
||||
private ColumnSelectorFactory colSelectorFactory;
|
||||
private TestLongColumnSelector timeSelector;
|
||||
private TestObjectColumnSelector<String> valueSelector;
|
||||
private TestObjectColumnSelector objectSelector;
|
||||
|
||||
private String[] strings = {"1111", "2222", "3333", null, "4444"};
|
||||
private long[] times = {8224, 6879, 2436, 3546, 7888};
|
||||
private SerializablePairLongString[] pairs = {
|
||||
new SerializablePairLongString(52782L, "AAAA"),
|
||||
new SerializablePairLongString(65492L, "BBBB"),
|
||||
new SerializablePairLongString(69134L, "CCCC"),
|
||||
new SerializablePairLongString(11111L, "DDDD"),
|
||||
new SerializablePairLongString(51223L, null)
|
||||
};
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
stringLastAggFactory = new StringLastAggregatorFactory("billy", "nilly", MAX_STRING_SIZE);
|
||||
combiningAggFactory = (StringLastAggregatorFactory) stringLastAggFactory.getCombiningFactory();
|
||||
timeSelector = new TestLongColumnSelector(times);
|
||||
valueSelector = new TestObjectColumnSelector<>(strings);
|
||||
objectSelector = new TestObjectColumnSelector<>(pairs);
|
||||
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
|
||||
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
|
||||
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
|
||||
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
|
||||
EasyMock.replay(colSelectorFactory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringLastAggregator()
|
||||
{
|
||||
StringLastAggregator agg = (StringLastAggregator) stringLastAggFactory.factorize(colSelectorFactory);
|
||||
|
||||
aggregate(agg);
|
||||
aggregate(agg);
|
||||
aggregate(agg);
|
||||
aggregate(agg);
|
||||
|
||||
Pair<Long, String> result = (Pair<Long, String>) agg.get();
|
||||
|
||||
Assert.assertEquals(strings[0], result.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringLastBufferAggregator()
|
||||
{
|
||||
StringLastBufferAggregator agg = (StringLastBufferAggregator) stringLastAggFactory.factorizeBuffered(
|
||||
colSelectorFactory);
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]);
|
||||
agg.init(buffer, 0);
|
||||
|
||||
aggregate(agg, buffer, 0);
|
||||
aggregate(agg, buffer, 0);
|
||||
aggregate(agg, buffer, 0);
|
||||
aggregate(agg, buffer, 0);
|
||||
|
||||
Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
|
||||
|
||||
Assert.assertEquals(strings[0], result.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCombine()
|
||||
{
|
||||
SerializablePairLongString pair1 = new SerializablePairLongString(1467225000L, "AAAA");
|
||||
SerializablePairLongString pair2 = new SerializablePairLongString(1467240000L, "BBBB");
|
||||
Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringLastCombiningAggregator()
|
||||
{
|
||||
StringLastAggregator agg = (StringLastAggregator) combiningAggFactory.factorize(colSelectorFactory);
|
||||
|
||||
aggregate(agg);
|
||||
aggregate(agg);
|
||||
aggregate(agg);
|
||||
aggregate(agg);
|
||||
|
||||
Pair<Long, String> result = (Pair<Long, String>) agg.get();
|
||||
Pair<Long, String> expected = (Pair<Long, String>) pairs[2];
|
||||
|
||||
Assert.assertEquals(expected.lhs, result.lhs);
|
||||
Assert.assertEquals(expected.rhs, result.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringLastCombiningBufferAggregator()
|
||||
{
|
||||
StringLastBufferAggregator agg = (StringLastBufferAggregator) combiningAggFactory.factorizeBuffered(
|
||||
colSelectorFactory);
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]);
|
||||
agg.init(buffer, 0);
|
||||
|
||||
aggregate(agg, buffer, 0);
|
||||
aggregate(agg, buffer, 0);
|
||||
aggregate(agg, buffer, 0);
|
||||
aggregate(agg, buffer, 0);
|
||||
|
||||
Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
|
||||
Pair<Long, String> expected = (Pair<Long, String>) pairs[2];
|
||||
|
||||
Assert.assertEquals(expected.lhs, result.lhs);
|
||||
Assert.assertEquals(expected.rhs, result.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringLastAggregateCombiner()
|
||||
{
|
||||
final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
|
||||
TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(strings);
|
||||
|
||||
StringLastAggregateCombiner stringFirstAggregateCombiner =
|
||||
(StringLastAggregateCombiner) combiningAggFactory.makeAggregateCombiner();
|
||||
|
||||
stringFirstAggregateCombiner.reset(columnSelector);
|
||||
|
||||
Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
|
||||
|
||||
columnSelector.increment();
|
||||
stringFirstAggregateCombiner.fold(columnSelector);
|
||||
|
||||
Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
|
||||
|
||||
stringFirstAggregateCombiner.reset(columnSelector);
|
||||
|
||||
Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
|
||||
}
|
||||
|
||||
private void aggregate(
|
||||
StringLastAggregator agg
|
||||
)
|
||||
{
|
||||
agg.aggregate();
|
||||
timeSelector.increment();
|
||||
valueSelector.increment();
|
||||
objectSelector.increment();
|
||||
}
|
||||
|
||||
private void aggregate(
|
||||
StringLastBufferAggregator agg,
|
||||
ByteBuffer buff,
|
||||
int position
|
||||
)
|
||||
{
|
||||
agg.aggregate(buff, position);
|
||||
timeSelector.increment();
|
||||
valueSelector.increment();
|
||||
objectSelector.increment();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,171 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.last;
|
||||
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.query.aggregation.TestLongColumnSelector;
|
||||
import io.druid.query.aggregation.TestObjectColumnSelector;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
public class StringLastBufferAggregatorTest
|
||||
{
|
||||
private void aggregateBuffer(
|
||||
TestLongColumnSelector timeSelector,
|
||||
TestObjectColumnSelector valueSelector,
|
||||
BufferAggregator agg,
|
||||
ByteBuffer buf,
|
||||
int position
|
||||
)
|
||||
{
|
||||
agg.aggregate(buf, position);
|
||||
timeSelector.increment();
|
||||
valueSelector.increment();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBufferAggregate() throws Exception
|
||||
{
|
||||
|
||||
final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L};
|
||||
final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
|
||||
Integer maxStringBytes = 1024;
|
||||
|
||||
TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
|
||||
TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
|
||||
|
||||
StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
|
||||
"billy", "billy", maxStringBytes
|
||||
);
|
||||
|
||||
StringLastBufferAggregator agg = new StringLastBufferAggregator(
|
||||
longColumnSelector,
|
||||
objectColumnSelector,
|
||||
maxStringBytes
|
||||
);
|
||||
|
||||
String testString = "ZZZZ";
|
||||
|
||||
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
|
||||
buf.putLong(1526728500L);
|
||||
buf.putInt(testString.length());
|
||||
buf.put(testString.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
int position = 0;
|
||||
|
||||
agg.init(buf, position);
|
||||
//noinspection ForLoopReplaceableByForEach
|
||||
for (int i = 0; i < timestamps.length; i++) {
|
||||
aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
|
||||
}
|
||||
|
||||
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
|
||||
|
||||
|
||||
Assert.assertEquals("expectec last string value", "DDDD", sp.rhs);
|
||||
Assert.assertEquals("last string timestamp is the biggest", new Long(1526725900L), new Long(sp.lhs));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullBufferAggregate() throws Exception
|
||||
{
|
||||
|
||||
final long[] timestamps = {1111L, 2222L, 6666L, 4444L, 5555L};
|
||||
final String[] strings = {"CCCC", "AAAA", "BBBB", null, "EEEE"};
|
||||
Integer maxStringBytes = 1024;
|
||||
|
||||
TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
|
||||
TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
|
||||
|
||||
StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
|
||||
"billy", "billy", maxStringBytes
|
||||
);
|
||||
|
||||
StringLastBufferAggregator agg = new StringLastBufferAggregator(
|
||||
longColumnSelector,
|
||||
objectColumnSelector,
|
||||
maxStringBytes
|
||||
);
|
||||
|
||||
String testString = "ZZZZ";
|
||||
|
||||
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
|
||||
buf.putLong(1526728500L);
|
||||
buf.putInt(testString.length());
|
||||
buf.put(testString.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
int position = 0;
|
||||
|
||||
agg.init(buf, position);
|
||||
//noinspection ForLoopReplaceableByForEach
|
||||
for (int i = 0; i < timestamps.length; i++) {
|
||||
aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
|
||||
}
|
||||
|
||||
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
|
||||
|
||||
|
||||
Assert.assertEquals("expectec last string value", strings[2], sp.rhs);
|
||||
Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[2]), new Long(sp.lhs));
|
||||
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testNoStringValue()
|
||||
{
|
||||
|
||||
final long[] timestamps = {1526724000L, 1526724600L};
|
||||
final Double[] doubles = {null, 2.00};
|
||||
Integer maxStringBytes = 1024;
|
||||
|
||||
TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
|
||||
TestObjectColumnSelector<Double> objectColumnSelector = new TestObjectColumnSelector<>(doubles);
|
||||
|
||||
StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
|
||||
"billy", "billy", maxStringBytes
|
||||
);
|
||||
|
||||
StringLastBufferAggregator agg = new StringLastBufferAggregator(
|
||||
longColumnSelector,
|
||||
objectColumnSelector,
|
||||
maxStringBytes
|
||||
);
|
||||
|
||||
String testString = "ZZZZ";
|
||||
|
||||
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
|
||||
buf.putLong(1526728500L);
|
||||
buf.putInt(testString.length());
|
||||
buf.put(testString.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
int position = 0;
|
||||
|
||||
agg.init(buf, position);
|
||||
//noinspection ForLoopReplaceableByForEach
|
||||
for (int i = 0; i < timestamps.length; i++) {
|
||||
aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.last;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.java.util.common.DateTimes;
|
||||
import io.druid.java.util.common.granularity.Granularities;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.SerializablePairLongString;
|
||||
import io.druid.query.timeseries.TimeseriesQuery;
|
||||
import io.druid.query.timeseries.TimeseriesQueryEngine;
|
||||
import io.druid.query.timeseries.TimeseriesResultValue;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class StringLastTimeseriesQueryTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testTopNWithDistinctCountAgg() throws Exception
|
||||
{
|
||||
TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
|
||||
|
||||
String visitor_id = "visitor_id";
|
||||
String client_type = "client_type";
|
||||
|
||||
IncrementalIndex index = new IncrementalIndex.Builder()
|
||||
.setIndexSchema(
|
||||
new IncrementalIndexSchema.Builder()
|
||||
.withQueryGranularity(Granularities.SECOND)
|
||||
.withMetrics(new CountAggregatorFactory("cnt"))
|
||||
.withMetrics(new StringLastAggregatorFactory(
|
||||
"last_client_type", "client_type", 1024)
|
||||
)
|
||||
.build()
|
||||
)
|
||||
.setMaxRowCount(1000)
|
||||
.buildOnheap();
|
||||
|
||||
|
||||
DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z");
|
||||
long timestamp = time.getMillis();
|
||||
|
||||
DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z");
|
||||
long timestamp1 = time1.getMillis();
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "iphone")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "1", client_type, "iphone")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp1,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "android")
|
||||
)
|
||||
);
|
||||
|
||||
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.aggregators(
|
||||
Lists.newArrayList(
|
||||
new StringLastAggregatorFactory(
|
||||
"last_client_type", client_type, 1024
|
||||
)
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
final Iterable<Result<TimeseriesResultValue>> results =
|
||||
engine.process(query, new IncrementalIndexStorageAdapter(index)).toList();
|
||||
|
||||
List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList(
|
||||
new Result<>(
|
||||
time,
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"last_client_type",
|
||||
new SerializablePairLongString(timestamp1, "android")
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue