Adding double colums supports (#4491)

* add double columns support

* Fix numbers and expected results in UTs

* adding float aggregators

* fix IT expected test results

* fix comments

* more fixes

* fix comp

* fix test

* refactor double and float aggregator factories

* fix

* fix UTs

* fix comments

* clean unused code

* fix more comments

* undo unnecessary changes

* fix null issue

* refactor TopNColumnSelectorStrategyFactory

* fix docs

* refactor NumericTopNColumnSelectorStrategy

* fix return

* fix comments

* handle the null case in DimesionIndexer

* more null fixing

* cosmetic changes
This commit is contained in:
Slim 2017-07-20 00:14:14 -07:00 committed by Roman Leventov
parent b4230ebb3c
commit 71e7a4c054
245 changed files with 7798 additions and 2000 deletions

View File

@ -23,10 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;
import java.util.Collections;
@ -38,14 +35,7 @@ import java.util.regex.Pattern;
*/
public class MapBasedRow implements Row
{
private static final Logger log = new Logger(MapBasedRow.class);
private static final Function<Object, String> TO_STRING_INCLUDING_NULL = new Function<Object, String>() {
@Override
public String apply(final Object o)
{
return String.valueOf(o);
}
};
private static final Function<Object, String> TO_STRING_INCLUDING_NULL = String::valueOf;
private final DateTime timestamp;
private final Map<String, Object> event;
@ -159,6 +149,29 @@ public class MapBasedRow implements Row
}
}
@Override
public double getDoubleMetric(String metric)
{
Object metricValue = event.get(metric);
if (metricValue == null) {
return 0.0d;
}
if (metricValue instanceof Number) {
return ((Number) metricValue).doubleValue();
} else if (metricValue instanceof String) {
try {
return Double.valueOf(((String) metricValue).replace(",", ""));
}
catch (Exception e) {
throw new ParseException(e, "Unable to parse metrics[%s], value[%s]", metric, metricValue);
}
} else {
throw new ParseException("Unknown type[%s]", metricValue.getClass());
}
}
@Override
public String toString()
{

View File

@ -90,4 +90,14 @@ public interface Row extends Comparable<Row>
* @return the long value for the provided column name.
*/
public long getLongMetric(String metric);
/**
* Returns the double value of the given metric column.
* <p/>
*
* @param metric the column name of the metric requested
*
* @return the double value for the provided column name.
*/
public double getDoubleMetric(String metric);
}

View File

@ -35,6 +35,7 @@ import io.druid.java.util.common.StringUtils;
@JsonSubTypes.Type(name = DimensionSchema.STRING_TYPE_NAME, value = StringDimensionSchema.class),
@JsonSubTypes.Type(name = DimensionSchema.LONG_TYPE_NAME, value = LongDimensionSchema.class),
@JsonSubTypes.Type(name = DimensionSchema.FLOAT_TYPE_NAME, value = FloatDimensionSchema.class),
@JsonSubTypes.Type(name = DimensionSchema.DOUBLE_TYPE_NAME, value = DoubleDimensionSchema.class),
@JsonSubTypes.Type(name = DimensionSchema.SPATIAL_TYPE_NAME, value = NewSpatialDimensionSchema.class),
})
public abstract class DimensionSchema
@ -43,6 +44,7 @@ public abstract class DimensionSchema
public static final String LONG_TYPE_NAME = "long";
public static final String FLOAT_TYPE_NAME = "float";
public static final String SPATIAL_TYPE_NAME = "spatial";
public static final String DOUBLE_TYPE_NAME = "double";
// main druid and druid-api should really use the same ValueType enum.
@ -52,6 +54,7 @@ public abstract class DimensionSchema
FLOAT,
LONG,
STRING,
DOUBLE,
COMPLEX;
@JsonValue

View File

@ -0,0 +1,45 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DoubleDimensionSchema extends DimensionSchema
{
@JsonCreator
public DoubleDimensionSchema(@JsonProperty("name") String name)
{
super(name, null);
}
@Override
public String getTypeName()
{
return DimensionSchema.DOUBLE_TYPE_NAME;
}
@Override
public ValueType getValueType()
{
return ValueType.DOUBLE;
}
}

View File

@ -29,6 +29,7 @@ import io.druid.collections.bitmap.MutableBitmap;
import io.druid.collections.bitmap.RoaringBitmapFactory;
import io.druid.collections.spatial.ImmutableRTree;
import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.DruidDoublePredicate;
import io.druid.query.filter.DruidFloatPredicate;
import io.druid.query.filter.DruidLongPredicate;
import io.druid.query.filter.DruidPredicateFactory;
@ -95,6 +96,12 @@ public class DimensionPredicateFilterBenchmark
{
return DruidFloatPredicate.ALWAYS_FALSE;
}
@Override
public DruidDoublePredicate makeDoublePredicate()
{
return DruidDoublePredicate.ALWAYS_FALSE;
}
},
null
);

View File

@ -238,6 +238,11 @@ public class ExpressionBenchmark
throw new UnsupportedOperationException();
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException();
}
@Override
public void close()
{

View File

@ -44,6 +44,7 @@ import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.BoundDimFilter;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.DruidDoublePredicate;
import io.druid.query.filter.DruidFloatPredicate;
import io.druid.query.filter.DruidLongPredicate;
import io.druid.query.filter.DruidPredicateFactory;
@ -633,6 +634,12 @@ public class FilterPartitionBenchmark
{
return DruidFloatPredicate.ALWAYS_FALSE;
}
@Override
public DruidDoublePredicate makeDoublePredicate()
{
return DruidDoublePredicate.ALWAYS_FALSE;
}
};
return new NoBitmapDimensionPredicateFilter(dimension, predicateFactory, extractionFn);

View File

@ -12,7 +12,7 @@ of OLAP data.
For more detailed information:
* Every row in Druid must have a timestamp. Data is always partitioned by time, and every query has a time filter. Query results can also be broken down by time buckets like minutes, hours, days, and so on.
* Dimensions are fields that can be filtered on or grouped by. They are always single Strings, arrays of Strings, single Longs, or single Floats.
* Dimensions are fields that can be filtered on or grouped by. They are always single Strings, arrays of Strings, single Longs, single Doubles or single Floats.
* Metrics are fields that can be aggregated. They are often stored as numbers (integers or floats) but can also be stored as complex objects like HyperLogLog sketches or approximate histogram sketches.
Typical production tables (or datasources as they are known in Druid) have fewer than 100 dimensions and fewer
@ -22,7 +22,7 @@ Below, we outline some best practices with schema design:
## Numeric dimensions
If the user wishes to ingest a column as a numeric-typed dimension (Long or Float), it is necessary to specify the type of the column in the `dimensions` section of the `dimensionsSpec`. If the type is omitted, Druid will ingest a column as the default String type.
If the user wishes to ingest a column as a numeric-typed dimension (Long, Double or Float), it is necessary to specify the type of the column in the `dimensions` section of the `dimensionsSpec`. If the type is omitted, Druid will ingest a column as the default String type.
There are performance tradeoffs between string and numeric columns. Numeric columns are generally faster to group on
than string columns. But unlike string columns, numeric columns don't have indexes, so they are generally slower to

View File

@ -36,12 +36,20 @@ computes the sum of values as a 64-bit, signed integer
#### `doubleSum` aggregator
Computes the sum of values as 64-bit floating point value. Similar to `longSum`
Computes and stores the sum of values as 64-bit floating point value. Similar to `longSum`
```json
{ "type" : "doubleSum", "name" : <output_name>, "fieldName" : <metric_name> }
```
#### `floatSum` aggregator
Computes and stores the sum of values as 32-bit floating point value. Similar to `longSum` and `doubleSum`
```json
{ "type" : "floatSum", "name" : <output_name>, "fieldName" : <metric_name> }
```
### Min / Max aggregators
#### `doubleMin` aggregator
@ -60,6 +68,22 @@ Computes the sum of values as 64-bit floating point value. Similar to `longSum`
{ "type" : "doubleMax", "name" : <output_name>, "fieldName" : <metric_name> }
```
#### `floatMin` aggregator
`floatMin` computes the minimum of all metric values and Float.POSITIVE_INFINITY
```json
{ "type" : "floatMin", "name" : <output_name>, "fieldName" : <metric_name> }
```
#### `floatMax` aggregator
`floatMax` computes the maximum of all metric values and Float.NEGATIVE_INFINITY
```json
{ "type" : "floatMax", "name" : <output_name>, "fieldName" : <metric_name> }
```
#### `longMin` aggregator
`longMin` computes the minimum of all metric values and Long.MAX_VALUE
@ -106,6 +130,30 @@ Note that queries with first/last aggregators on a segment created with rollup e
}
```
#### `floatFirst` aggregator
`floatFirst` computes the metric value with the minimum timestamp or 0 if no row exist
```json
{
"type" : "floatFirst",
"name" : <output_name>,
"fieldName" : <metric_name>
}
```
#### `floatLast` aggregator
`floatLast` computes the metric value with the maximum timestamp or 0 if no row exist
```json
{
"type" : "floatLast",
"name" : <output_name>,
"fieldName" : <metric_name>
}
```
#### `longFirst` aggregator
`longFirst` computes the metric value with the minimum timestamp or 0 if no row exist

View File

@ -75,4 +75,10 @@ public class DistinctCountAggregator implements Aggregator
{
return (long) mutableBitmap.size();
}
@Override
public double getDouble()
{
return (double) mutableBitmap.size();
}
}

View File

@ -23,9 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
@ -43,8 +41,6 @@ import java.util.List;
public class DistinctCountAggregatorFactory extends AggregatorFactory
{
private static final Logger log = new Logger(DistinctCountAggregatorFactory.class);
private static final byte CACHE_TYPE_ID = 20;
private static final BitMapFactory DEFAULT_BITMAP_FACTORY = new RoaringBitMapFactory();
private final String name;
@ -178,7 +174,7 @@ public class DistinctCountAggregatorFactory extends AggregatorFactory
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
byte[] bitMapFactoryCacheKey = StringUtils.toUtf8(bitMapFactory.toString());
return ByteBuffer.allocate(2 + fieldNameBytes.length + bitMapFactoryCacheKey.length)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.DISTINCT_COUNT_CACHE_KEY)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(bitMapFactoryCacheKey)

View File

@ -85,6 +85,12 @@ public class DistinctCountBufferAggregator implements BufferAggregator
return buf.getLong(position);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return (double) buf.getLong(position);
}
@Override
public void close()
{

View File

@ -47,17 +47,23 @@ public class EmptyDistinctCountAggregator implements Aggregator
@Override
public float getFloat()
{
return (float) 0;
return 0.0f;
}
@Override
public long getLong()
{
return 0L;
}
@Override
public double getDouble()
{
return 0.0;
}
@Override
public void close()
{
}
@Override
public long getLong()
{
return (long) 0;
}
}

View File

@ -69,6 +69,12 @@ public final class EmptyDistinctCountBufferAggregator implements BufferAggregato
return (long) 0;
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return 0;
}
@Override
public void close()
{

View File

@ -147,7 +147,10 @@ public class ScanQueryRunnerTest
"index",
"indexMin",
"indexMaxPlusTen",
"quality_uniques"
"quality_uniques",
"indexFloat",
"indexMaxFloat",
"indexMinFloat"
);
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
@ -185,7 +188,10 @@ public class ScanQueryRunnerTest
"index",
"indexMin",
"indexMaxPlusTen",
"quality_uniques"
"quality_uniques",
"indexFloat",
"indexMaxFloat",
"indexMinFloat"
);
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)

View File

@ -86,9 +86,9 @@ public class TimestampAggregator implements Aggregator
}
@Override
public void close()
public double getDouble()
{
// no resource to cleanup
return (double) most;
}
@Override
@ -97,6 +97,12 @@ public class TimestampAggregator implements Aggregator
return most;
}
@Override
public void close()
{
// no resource to cleanup
}
@Override
public Aggregator clone()
{

View File

@ -36,8 +36,6 @@ import java.util.Objects;
public class TimestampAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 31;
final String name;
final String fieldName;
final String timeFormat;
@ -152,7 +150,7 @@ public class TimestampAggregatorFactory extends AggregatorFactory
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(1 + fieldNameBytes.length)
.put(CACHE_TYPE_ID).put(fieldNameBytes).array();
.put(AggregatorUtil.TIMESTAMP_CACHE_TYPE_ID).put(fieldNameBytes).array();
}
@Override

View File

@ -70,7 +70,7 @@ public class TimestampBufferAggregator implements BufferAggregator
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float)buf.getLong(position);
return (float) buf.getLong(position);
}
@Override
@ -79,6 +79,12 @@ public class TimestampBufferAggregator implements BufferAggregator
return buf.getLong(position);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return (double) buf.getLong(position);
}
@Override
public void close()
{

View File

@ -180,6 +180,12 @@ public class MapVirtualColumn implements VirtualColumn
return null;
}
@Override
public DoubleColumnSelector makeDoubleColumnSelector(String columnName, ColumnSelectorFactory factory)
{
return null;
}
@Override
public ColumnCapabilities capabilities(String columnName)
{

View File

@ -55,6 +55,12 @@ public class EmptySketchAggregator implements Aggregator
throw new UnsupportedOperationException("Not implemented");
}
@Override
public double getDouble()
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public void close()
{

View File

@ -65,6 +65,12 @@ public final class EmptySketchBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("Not implemented");
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public void close()
{

View File

@ -80,6 +80,12 @@ public class SketchAggregator implements Aggregator
throw new UnsupportedOperationException("Not implemented");
}
@Override
public double getDouble()
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public void close()
{

View File

@ -117,6 +117,12 @@ public class SketchBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("Not implemented");
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public void close()
{

View File

@ -23,15 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import java.util.Collections;
import java.util.List;
public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
{
private static final byte CACHE_TYPE_ID = 15;
private final boolean shouldFinalize;
private final boolean isInputThetaSketch;
private final Integer errorBoundsStdDev;
@ -46,7 +44,7 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
@JsonProperty("errorBoundsStdDev") Integer errorBoundsStdDev
)
{
super(name, fieldName, size, CACHE_TYPE_ID);
super(name, fieldName, size, AggregatorUtil.SKETCH_MERGE_CACHE_TYPE_ID);
this.shouldFinalize = (shouldFinalize == null) ? true : shouldFinalize.booleanValue();
this.isInputThetaSketch = (isInputThetaSketch == null) ? false : isInputThetaSketch.booleanValue();
this.errorBoundsStdDev = errorBoundsStdDev;
@ -161,18 +159,18 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
if (shouldFinalize != that.shouldFinalize) {
return false;
}
if (errorBoundsStdDev == null ^ that.errorBoundsStdDev == null) {
// one of the two stddevs (not both) are null
return false;
}
if (errorBoundsStdDev != null && that.errorBoundsStdDev != null &&
if (errorBoundsStdDev != null && that.errorBoundsStdDev != null &&
errorBoundsStdDev.intValue() != that.errorBoundsStdDev.intValue()) {
// neither stddevs are null, Integer values don't match
return false;
}
return isInputThetaSketch == that.isInputThetaSketch;
}

View File

@ -92,6 +92,12 @@ public class ApproximateHistogramAggregator implements Aggregator
throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getLong()");
}
@Override
public double getDouble()
{
throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getDouble()");
}
@Override
public void close()
{

View File

@ -31,6 +31,7 @@ import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import org.apache.commons.codec.binary.Base64;
@ -44,8 +45,6 @@ import java.util.List;
@JsonTypeName("approxHistogram")
public class ApproximateHistogramAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 12;
protected final String name;
protected final String fieldName;
@ -234,7 +233,7 @@ public class ApproximateHistogramAggregatorFactory extends AggregatorFactory
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(1 + fieldNameBytes.length + Ints.BYTES * 2 + Floats.BYTES * 2)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.APPROX_HIST_CACHE_TYPE_ID)
.put(fieldNameBytes)
.putInt(resolution)
.putInt(numBuckets)

View File

@ -95,6 +95,12 @@ public class ApproximateHistogramBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getLong()");
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getDouble()");
}
@Override
public void close()
{

View File

@ -90,6 +90,12 @@ public class ApproximateHistogramFoldingAggregator implements Aggregator
throw new UnsupportedOperationException("ApproximateHistogramFoldingAggregator does not support getLong()");
}
@Override
public double getDouble()
{
throw new UnsupportedOperationException("ApproximateHistogramFoldingAggregator does not support getDouble()");
}
@Override
public void close()
{

View File

@ -29,6 +29,7 @@ import io.druid.java.util.common.IAE;
import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
@ -38,7 +39,6 @@ import java.nio.ByteBuffer;
@JsonTypeName("approxHistogramFold")
public class ApproximateHistogramFoldingAggregatorFactory extends ApproximateHistogramAggregatorFactory
{
private static final byte CACHE_TYPE_ID = 13;
@JsonCreator
public ApproximateHistogramFoldingAggregatorFactory(
@ -141,7 +141,7 @@ public class ApproximateHistogramFoldingAggregatorFactory extends ApproximateHis
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(1 + fieldNameBytes.length + Ints.BYTES * 2 + Floats.BYTES * 2)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.APPROX_HIST_FOLDING_CACHE_TYPE_ID)
.put(fieldNameBytes)
.putInt(resolution)
.putInt(numBuckets)

View File

@ -98,6 +98,11 @@ public class ApproximateHistogramFoldingBufferAggregator implements BufferAggreg
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getLong()");
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getDouble()");
}
@Override
public void close()
{

View File

@ -125,7 +125,7 @@ public class ApproximateHistogramTopNQueryTest
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
QueryRunnerTestHelper.commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index"),

View File

@ -63,6 +63,12 @@ public abstract class VarianceAggregator implements Aggregator
throw new UnsupportedOperationException("VarianceAggregator does not support getLong()");
}
@Override
public double getDouble()
{
throw new UnsupportedOperationException("VarianceAggregator does not support getDouble()");
}
public static final class FloatVarianceAggregator extends VarianceAggregator
{
private final FloatColumnSelector selector;

View File

@ -28,6 +28,7 @@ import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.NoopAggregator;
import io.druid.query.aggregation.NoopBufferAggregator;
@ -47,8 +48,6 @@ import java.util.Objects;
@JsonTypeName("variance")
public class VarianceAggregatorFactory extends AggregatorFactory
{
protected static final byte CACHE_TYPE_ID = 16;
protected final String fieldName;
protected final String name;
protected final String estimator;
@ -228,7 +227,7 @@ public class VarianceAggregatorFactory extends AggregatorFactory
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
byte[] inputTypeBytes = StringUtils.toUtf8(inputType);
return ByteBuffer.allocate(2 + fieldNameBytes.length + 1 + inputTypeBytes.length)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.VARIANCE_CACHE_TYPE_ID)
.put(isVariancePop ? (byte) 1 : 0)
.put(fieldNameBytes)
.put((byte) 0xFF)

View File

@ -74,6 +74,12 @@ public abstract class VarianceBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("VarianceBufferAggregator does not support getFloat()");
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("VarianceBufferAggregator does not support getDouble()");
}
@Override
public void close()
{

View File

@ -25,6 +25,7 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.QueryRunner;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryRunnerTest;
@ -52,7 +53,7 @@ public class VarianceTimeseriesQueryTest
private final QueryRunner runner;
private final boolean descending;
public VarianceTimeseriesQueryTest(QueryRunner runner, boolean descending)
public VarianceTimeseriesQueryTest(QueryRunner runner, boolean descending, List<AggregatorFactory> aggregatorFactories)
{
this.runner = runner;
this.descending = descending;

View File

@ -435,6 +435,12 @@ public class IndexGeneratorJob implements Jobby
return row.getLongMetric(metric);
}
@Override
public double getDoubleMetric(String metric)
{
return row.getDoubleMetric(metric);
}
@Override
public int compareTo(Row o)
{

View File

@ -111,6 +111,8 @@ public class InputRowSerde
out.writeFloat(agg.getFloat());
} else if (t.equals("long")) {
WritableUtils.writeVLong(out, agg.getLong());
} else if (t.equals("double")) {
out.writeDouble(agg.getDouble());
} else {
//its a complex metric
Object val = agg.get();
@ -212,6 +214,8 @@ public class InputRowSerde
event.put(metric, in.readFloat());
} else if (type.equals("long")) {
event.put(metric, WritableUtils.readVLong(in));
} else if (type.equals("double")) {
event.put(metric, in.readDouble());
} else {
ComplexMetricSerde serde = getComplexMetricSerde(type);
byte[] value = readBytes(in);

View File

@ -81,6 +81,12 @@ public class SegmentInputRow implements InputRow
return delegate.getLongMetric(metric);
}
@Override
public double getDoubleMetric(String metric)
{
return delegate.getDoubleMetric(metric);
}
@Override
public int compareTo(Row row)
{

View File

@ -71,7 +71,7 @@ public class InputRowSerdeTest
{
// Prepare the mocks & set close() call count expectation to 1
final Aggregator mockedAggregator = EasyMock.createMock(DoubleSumAggregator.class);
EasyMock.expect(mockedAggregator.getFloat()).andReturn(0f).times(1);
EasyMock.expect(mockedAggregator.getDouble()).andReturn(0d).times(1);
mockedAggregator.aggregate();
EasyMock.expectLastCall().times(1);
mockedAggregator.close();

View File

@ -173,7 +173,7 @@ public class HadoopConverterJobTest
Map.class
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory(TestIndex.METRICS[0], TestIndex.METRICS[0]),
new DoubleSumAggregatorFactory(TestIndex.DOUBLE_METRICS[0], TestIndex.DOUBLE_METRICS[0]),
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
},
new UniformGranularitySpec(

View File

@ -415,7 +415,7 @@
"timestamp": "2013-01-01T00:00:00.000Z",
"event": {
"has_links": "Yes",
"tweet_length": 3.143742E7,
"tweet_length": 3.1437419E7,
"num_tweets": 376237.0
}
},
@ -424,7 +424,7 @@
"timestamp": "2013-01-02T00:00:00.000Z",
"event": {
"has_links": "No",
"tweet_length": 2.10402688E8,
"tweet_length": 2.10402683E8,
"num_tweets": 3375243.0
}
},
@ -433,7 +433,7 @@
"timestamp": "2013-01-02T00:00:00.000Z",
"event": {
"has_links": "Yes",
"tweet_length": 3.599512E7,
"tweet_length": 3.5995118E7,
"num_tweets": 424223.0
}
},
@ -442,7 +442,7 @@
"timestamp": "2013-01-03T00:00:00.000Z",
"event": {
"has_links": "No",
"tweet_length": 1.96451456E8,
"tweet_length": 1.9645145E8,
"num_tweets": 3144985.0
}
},
@ -451,7 +451,7 @@
"timestamp": "2013-01-03T00:00:00.000Z",
"event": {
"has_links": "Yes",
"tweet_length": 3.4913568E7,
"tweet_length": 3.4913569E7,
"num_tweets": 407434.0
}
}
@ -528,7 +528,7 @@
"timestamp": "2013-01-01T00:00:00.000Z",
"event": {
"has_links": "No",
"tweet_length": 7.4820448E7,
"tweet_length": 7.4820449E7,
"num_tweets": 1170229.0
}
},
@ -546,7 +546,7 @@
"timestamp": "2013-01-02T00:00:00.000Z",
"event": {
"has_links": "No",
"tweet_length": 2.10402688E8,
"tweet_length": 2.10402683E8,
"num_tweets": 3375243.0
}
},
@ -555,7 +555,7 @@
"timestamp": "2013-01-02T00:00:00.000Z",
"event": {
"has_links": "Yes",
"tweet_length": 3.599512E7,
"tweet_length": 3.5995118E7,
"num_tweets": 424223.0
}
},
@ -564,7 +564,7 @@
"timestamp": "2013-01-03T00:00:00.000Z",
"event": {
"has_links": "No",
"tweet_length": 1.59141088E8,
"tweet_length": 1.59141096E8,
"num_tweets": 2567986.0
}
},

View File

@ -29,6 +29,9 @@ import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.FloatMaxAggregatorFactory;
import io.druid.query.aggregation.FloatMinAggregatorFactory;
import io.druid.query.aggregation.FloatSumAggregatorFactory;
import io.druid.query.aggregation.HistogramAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
@ -37,12 +40,14 @@ import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.aggregation.hyperloglog.PreComputedHyperUniquesSerde;
import io.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import io.druid.query.aggregation.last.FloatLastAggregatorFactory;
import io.druid.query.aggregation.last.LongLastAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
@ -81,7 +86,10 @@ public class AggregatorsModule extends SimpleModule
@JsonSubTypes.Type(name = "count", value = CountAggregatorFactory.class),
@JsonSubTypes.Type(name = "longSum", value = LongSumAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleSum", value = DoubleSumAggregatorFactory.class),
@JsonSubTypes.Type(name = "floatSum", value = FloatSumAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleMax", value = DoubleMaxAggregatorFactory.class),
@JsonSubTypes.Type(name = "floatMin", value = FloatMinAggregatorFactory.class),
@JsonSubTypes.Type(name = "floatMax", value = FloatMaxAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleMin", value = DoubleMinAggregatorFactory.class),
@JsonSubTypes.Type(name = "longMax", value = LongMaxAggregatorFactory.class),
@JsonSubTypes.Type(name = "longMin", value = LongMinAggregatorFactory.class),
@ -92,8 +100,10 @@ public class AggregatorsModule extends SimpleModule
@JsonSubTypes.Type(name = "filtered", value = FilteredAggregatorFactory.class),
@JsonSubTypes.Type(name = "longFirst", value = LongFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleFirst", value = DoubleFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "floatFirst", value = FloatFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class)
@JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class)
})
public static interface AggregatorFactoryMixin
{

View File

@ -47,7 +47,7 @@ public class DruidMetrics
int retVal = 0;
for (AggregatorFactory agg : aggs) {
// This needs to change when we have support column types better
if (!agg.getTypeName().equals("float") && !agg.getTypeName().equals("long")) {
if (!agg.getTypeName().equals("float") && !agg.getTypeName().equals("long") && !agg.getTypeName().equals("double")) {
retVal++;
}
}

View File

@ -23,9 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.emitter.EmittingLogger;
import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;

View File

@ -40,9 +40,9 @@ public interface Aggregator extends Closeable
void reset();
Object get();
float getFloat();
long getLong();
double getDouble();
@Override
void close();
long getLong();
}

View File

@ -24,6 +24,7 @@ import io.druid.java.util.common.Pair;
import io.druid.math.expr.ExprMacroTable;
import io.druid.math.expr.Parser;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DoubleColumnSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.virtual.ExpressionSelectors;
@ -36,6 +37,33 @@ import java.util.Set;
public class AggregatorUtil
{
public static final byte STRING_SEPARATOR = (byte) 0xFF;
public static final byte COUNT_CACHE_TYPE_ID = 0x0;
public static final byte LONG_SUM_CACHE_TYPE_ID = 0x1;
public static final byte DOUBLE_SUM_CACHE_TYPE_ID = 0x2;
public static final byte DOUBLE_MAX_CACHE_TYPE_ID = 0x3;
public static final byte DOUBLE_MIN_CACHE_TYPE_ID = 0x4;
public static final byte HYPER_UNIQUE_CACHE_TYPE_ID = 0x5;
public static final byte JS_CACHE_TYPE_ID = 0x6;
public static final byte HIST_CACHE_TYPE_ID = 0x7;
public static final byte CARD_CACHE_TYPE_ID = 0x8;
public static final byte FILTERED_AGG_CACHE_TYPE_ID = 0x9;
public static final byte LONG_MAX_CACHE_TYPE_ID = 0xA;
public static final byte LONG_MIN_CACHE_TYPE_ID = 0xB;
public static final byte FLOAT_SUM_CACHE_TYPE_ID = 0xC;
public static final byte FLOAT_MAX_CACHE_TYPE_ID = 0xD;
public static final byte FLOAT_MIN_CACHE_TYPE_ID = 0xE;
public static final byte SKETCH_MERGE_CACHE_TYPE_ID = 0xF;
public static final byte DISTINCT_COUNT_CACHE_KEY = 0x10;
public static final byte FLOAT_LAST_CACHE_TYPE_ID = 0x11;
public static final byte APPROX_HIST_CACHE_TYPE_ID = 0x12;
public static final byte APPROX_HIST_FOLDING_CACHE_TYPE_ID = 0x13;
public static final byte DOUBLE_FIRST_CACHE_TYPE_ID = 0x14;
public static final byte DOUBLE_LAST_CACHE_TYPE_ID = 0x15;
public static final byte FLOAT_FIRST_CACHE_TYPE_ID = 0x16;
public static final byte LONG_FIRST_CACHE_TYPE_ID = 0x17;
public static final byte LONG_LAST_CACHE_TYPE_ID = 0x18;
public static final byte TIMESTAMP_CACHE_TYPE_ID = 0x19;
public static final byte VARIANCE_CACHE_TYPE_ID = 0x1A;
/**
* returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
@ -132,4 +160,25 @@ public class AggregatorUtil
}
throw new IllegalArgumentException("Must have a valid, non-null fieldName or expression");
}
public static DoubleColumnSelector getDoubleColumnSelector(
final ColumnSelectorFactory metricFactory,
final ExprMacroTable macroTable,
final String fieldName,
final String fieldExpression,
final double nullValue
)
{
if (fieldName != null && fieldExpression == null) {
return metricFactory.makeDoubleColumnSelector(fieldName);
}
if (fieldName == null && fieldExpression != null) {
return ExpressionSelectors.makeDoubleColumnSelector(
metricFactory,
Parser.parse(fieldExpression, macroTable),
nullValue
);
}
throw new IllegalArgumentException("Must have a valid, non-null fieldName or expression");
}
}

View File

@ -112,6 +112,23 @@ public interface BufferAggregator extends HotLoopCallee
*/
long getLong(ByteBuffer buf, int position);
/**
* Returns the double representation of the given aggregate byte array
*
* Converts the given byte buffer representation into the intermediate aggregate value.
*
* <b>Implementations must not change the position, limit or mark of the given buffer</b>
*
* Implementations are only required to support this method if they are aggregations which
* have an {@link AggregatorFactory#getTypeName()} of "double".
* If unimplemented, throwing an {@link UnsupportedOperationException} is common and recommended.
*
* @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the aggregate value is stored
* @return the double representation of the aggregate
*/
double getDouble(ByteBuffer buf, int position);
/**
* Release any resources used by the aggregator
*/

View File

@ -68,6 +68,12 @@ public class CountAggregator implements Aggregator
return count;
}
@Override
public double getDouble()
{
return (double) count;
}
@Override
public Aggregator clone()
{

View File

@ -34,7 +34,6 @@ import java.util.List;
*/
public class CountAggregatorFactory extends AggregatorFactory
{
private static final byte[] CACHE_KEY = new byte[]{0x0};
private final String name;
@JsonCreator
@ -111,7 +110,7 @@ public class CountAggregatorFactory extends AggregatorFactory
@Override
public byte[] getCacheKey()
{
return CACHE_KEY;
return new byte[]{AggregatorUtil.COUNT_CACHE_TYPE_ID};
}
@Override

View File

@ -52,6 +52,12 @@ public class CountBufferAggregator implements BufferAggregator
return buf.getLong(position);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public long getLong(ByteBuffer buf, int position)

View File

@ -19,7 +19,7 @@
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.DoubleColumnSelector;
import java.util.Comparator;
@ -34,11 +34,11 @@ public class DoubleMaxAggregator implements Aggregator
return Math.max(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue());
}
private final FloatColumnSelector selector;
private final DoubleColumnSelector selector;
private double max;
public DoubleMaxAggregator(FloatColumnSelector selector)
public DoubleMaxAggregator(DoubleColumnSelector selector)
{
this.selector = selector;
@ -75,6 +75,12 @@ public class DoubleMaxAggregator implements Aggregator
return (long) max;
}
@Override
public double getDouble()
{
return max;
}
@Override
public Aggregator clone()
{
@ -86,4 +92,5 @@ public class DoubleMaxAggregator implements Aggregator
{
// no resources to cleanup
}
}

View File

@ -22,30 +22,19 @@ package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles;
import io.druid.java.util.common.StringUtils;
import io.druid.math.expr.ExprMacroTable;
import io.druid.math.expr.Parser;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.FloatColumnSelector;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
/**
*/
public class DoubleMaxAggregatorFactory extends AggregatorFactory
public class DoubleMaxAggregatorFactory extends SimpleDoubleAggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x3;
private final String name;
private final String fieldName;
private final String expression;
private final ExprMacroTable macroTable;
@JsonCreator
public DoubleMaxAggregatorFactory(
@ -55,16 +44,7 @@ public class DoubleMaxAggregatorFactory extends AggregatorFactory
@JacksonInject ExprMacroTable macroTable
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkArgument(
fieldName == null ^ expression == null,
"Must have a valid, non-null fieldName or expression"
);
this.name = name;
this.fieldName = fieldName;
this.expression = expression;
this.macroTable = macroTable;
super(macroTable, fieldName, name, expression);
}
public DoubleMaxAggregatorFactory(String name, String fieldName)
@ -75,30 +55,13 @@ public class DoubleMaxAggregatorFactory extends AggregatorFactory
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new DoubleMaxAggregator(getFloatColumnSelector(metricFactory));
return new DoubleMaxAggregator(getDoubleColumnSelector(metricFactory, Double.NEGATIVE_INFINITY));
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleMaxBufferAggregator(getFloatColumnSelector(metricFactory));
}
private FloatColumnSelector getFloatColumnSelector(ColumnSelectorFactory metricFactory)
{
return AggregatorUtil.getFloatColumnSelector(
metricFactory,
macroTable,
fieldName,
expression,
Float.NEGATIVE_INFINITY
);
}
@Override
public Comparator getComparator()
{
return DoubleMaxAggregator.COMPARATOR;
return new DoubleMaxBufferAggregator(getDoubleColumnSelector(metricFactory, Double.NEGATIVE_INFINITY));
}
@Override
@ -113,15 +76,6 @@ public class DoubleMaxAggregatorFactory extends AggregatorFactory
return new DoubleMaxAggregatorFactory(name, name, null, macroTable);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
@ -129,49 +83,12 @@ public class DoubleMaxAggregatorFactory extends AggregatorFactory
return Collections.singletonList(new DoubleMaxAggregatorFactory(fieldName, fieldName, expression, macroTable));
}
@Override
public Object deserialize(Object object)
{
// handle "NaN" / "Infinity" values serialized as strings in JSON
if (object instanceof String) {
return Double.parseDouble((String) object);
}
return object;
}
@Override
public Object finalizeComputation(Object object)
{
return object;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public String getExpression()
{
return expression;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public List<String> requiredFields()
{
return fieldName != null
? Collections.singletonList(fieldName)
: Parser.findRequiredBindings(Parser.parse(expression, macroTable));
}
@Override
public byte[] getCacheKey()
{
@ -179,25 +96,13 @@ public class DoubleMaxAggregatorFactory extends AggregatorFactory
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.DOUBLE_MAX_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)
.array();
}
@Override
public String getTypeName()
{
return "float";
}
@Override
public int getMaxIntermediateSize()
{
return Doubles.BYTES;
}
@Override
public String toString()
{
@ -232,13 +137,4 @@ public class DoubleMaxAggregatorFactory extends AggregatorFactory
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (expression != null ? expression.hashCode() : 0);
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -19,7 +19,7 @@
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.DoubleColumnSelector;
import java.nio.ByteBuffer;
@ -28,7 +28,7 @@ import java.nio.ByteBuffer;
public class DoubleMaxBufferAggregator extends SimpleDoubleBufferAggregator
{
DoubleMaxBufferAggregator(FloatColumnSelector selector)
DoubleMaxBufferAggregator(DoubleColumnSelector selector)
{
super(selector);
}

View File

@ -19,7 +19,7 @@
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.DoubleColumnSelector;
import java.util.Comparator;
@ -34,11 +34,11 @@ public class DoubleMinAggregator implements Aggregator
return Math.min(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue());
}
private final FloatColumnSelector selector;
private final DoubleColumnSelector selector;
private double min;
public DoubleMinAggregator(FloatColumnSelector selector)
public DoubleMinAggregator(DoubleColumnSelector selector)
{
this.selector = selector;
@ -75,6 +75,12 @@ public class DoubleMinAggregator implements Aggregator
return (long) min;
}
@Override
public double getDouble()
{
return min;
}
@Override
public Aggregator clone()
{

View File

@ -22,32 +22,20 @@ package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles;
import io.druid.java.util.common.StringUtils;
import io.druid.math.expr.ExprMacroTable;
import io.druid.math.expr.Parser;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.FloatColumnSelector;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
/**
*/
public class DoubleMinAggregatorFactory extends AggregatorFactory
public class DoubleMinAggregatorFactory extends SimpleDoubleAggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x4;
private final String name;
private final String fieldName;
private final String expression;
private final ExprMacroTable macroTable;
@JsonCreator
public DoubleMinAggregatorFactory(
@JsonProperty("name") String name,
@ -56,16 +44,7 @@ public class DoubleMinAggregatorFactory extends AggregatorFactory
@JacksonInject ExprMacroTable macroTable
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkArgument(
fieldName == null ^ expression == null,
"Must have a valid, non-null fieldName or expression"
);
this.name = name;
this.fieldName = fieldName;
this.expression = expression;
this.macroTable = macroTable;
super(macroTable, fieldName, name, expression);
}
public DoubleMinAggregatorFactory(String name, String fieldName)
@ -76,24 +55,13 @@ public class DoubleMinAggregatorFactory extends AggregatorFactory
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new DoubleMinAggregator(getFloatColumnSelector(metricFactory));
return new DoubleMinAggregator(getDoubleColumnSelector(metricFactory, Double.POSITIVE_INFINITY));
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleMinBufferAggregator(getFloatColumnSelector(metricFactory));
}
private FloatColumnSelector getFloatColumnSelector(ColumnSelectorFactory metricFactory)
{
return AggregatorUtil.getFloatColumnSelector(
metricFactory,
macroTable,
fieldName,
expression,
Float.POSITIVE_INFINITY
);
return new DoubleMinBufferAggregator(getDoubleColumnSelector(metricFactory, Double.POSITIVE_INFINITY));
}
@Override
@ -114,16 +82,6 @@ public class DoubleMinAggregatorFactory extends AggregatorFactory
return new DoubleMinAggregatorFactory(name, name, null, macroTable);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
@ -135,48 +93,6 @@ public class DoubleMinAggregatorFactory extends AggregatorFactory
));
}
@Override
public Object deserialize(Object object)
{
// handle "NaN" / "Infinity" values serialized as strings in JSON
if (object instanceof String) {
return Double.parseDouble((String) object);
}
return object;
}
@Override
public Object finalizeComputation(Object object)
{
return object;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public String getExpression()
{
return expression;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public List<String> requiredFields()
{
return fieldName != null
? Collections.singletonList(fieldName)
: Parser.findRequiredBindings(Parser.parse(expression, macroTable));
}
@Override
public byte[] getCacheKey()
@ -185,25 +101,13 @@ public class DoubleMinAggregatorFactory extends AggregatorFactory
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.DOUBLE_MIN_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)
.array();
}
@Override
public String getTypeName()
{
return "float";
}
@Override
public int getMaxIntermediateSize()
{
return Doubles.BYTES;
}
@Override
public String toString()
{
@ -238,13 +142,4 @@ public class DoubleMinAggregatorFactory extends AggregatorFactory
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (expression != null ? expression.hashCode() : 0);
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -19,7 +19,7 @@
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.DoubleColumnSelector;
import java.nio.ByteBuffer;
@ -28,7 +28,7 @@ import java.nio.ByteBuffer;
public class DoubleMinBufferAggregator extends SimpleDoubleBufferAggregator
{
DoubleMinBufferAggregator(FloatColumnSelector selector)
DoubleMinBufferAggregator(DoubleColumnSelector selector)
{
super(selector);
}

View File

@ -21,7 +21,7 @@ package io.druid.query.aggregation;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Doubles;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.DoubleColumnSelector;
import java.util.Comparator;
@ -43,11 +43,11 @@ public class DoubleSumAggregator implements Aggregator
return ((Number) lhs).doubleValue() + ((Number) rhs).doubleValue();
}
private final FloatColumnSelector selector;
private final DoubleColumnSelector selector;
private double sum;
public DoubleSumAggregator(FloatColumnSelector selector)
public DoubleSumAggregator(DoubleColumnSelector selector)
{
this.selector = selector;
@ -95,4 +95,10 @@ public class DoubleSumAggregator implements Aggregator
{
// no resources to cleanup
}
@Override
public double getDouble()
{
return sum;
}
}

View File

@ -22,31 +22,19 @@ package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles;
import io.druid.java.util.common.StringUtils;
import io.druid.math.expr.ExprMacroTable;
import io.druid.math.expr.Parser;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.FloatColumnSelector;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
/**
*/
public class DoubleSumAggregatorFactory extends AggregatorFactory
public class DoubleSumAggregatorFactory extends SimpleDoubleAggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x2;
private final String name;
private final String fieldName;
private final String expression;
private final ExprMacroTable macroTable;
@JsonCreator
public DoubleSumAggregatorFactory(
@ -56,16 +44,7 @@ public class DoubleSumAggregatorFactory extends AggregatorFactory
@JacksonInject ExprMacroTable macroTable
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkArgument(
fieldName == null ^ expression == null,
"Must have a valid, non-null fieldName or expression"
);
this.name = name;
this.fieldName = fieldName;
this.expression = expression;
this.macroTable = macroTable;
super(macroTable, fieldName, name, expression);
}
public DoubleSumAggregatorFactory(String name, String fieldName)
@ -76,24 +55,13 @@ public class DoubleSumAggregatorFactory extends AggregatorFactory
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new DoubleSumAggregator(getFloatColumnSelector(metricFactory));
return new DoubleSumAggregator(getDoubleColumnSelector(metricFactory, 0.0));
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleSumBufferAggregator(getFloatColumnSelector(metricFactory));
}
private FloatColumnSelector getFloatColumnSelector(ColumnSelectorFactory metricFactory)
{
return AggregatorUtil.getFloatColumnSelector(metricFactory, macroTable, fieldName, expression, 0f);
}
@Override
public Comparator getComparator()
{
return DoubleSumAggregator.COMPARATOR;
return new DoubleSumBufferAggregator(getDoubleColumnSelector(metricFactory, 0.0));
}
@Override
@ -108,65 +76,12 @@ public class DoubleSumAggregatorFactory extends AggregatorFactory
return new DoubleSumAggregatorFactory(name, name, null, macroTable);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.asList(new DoubleSumAggregatorFactory(fieldName, fieldName, expression, macroTable));
}
@Override
public Object deserialize(Object object)
{
// handle "NaN" / "Infinity" values serialized as strings in JSON
if (object instanceof String) {
return Double.parseDouble((String) object);
}
return object;
}
@Override
public Object finalizeComputation(Object object)
{
return object;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public String getExpression()
{
return expression;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public List<String> requiredFields()
{
return fieldName != null
? Collections.singletonList(fieldName)
: Parser.findRequiredBindings(Parser.parse(expression, macroTable));
}
@Override
public byte[] getCacheKey()
{
@ -174,25 +89,13 @@ public class DoubleSumAggregatorFactory extends AggregatorFactory
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.DOUBLE_SUM_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)
.array();
}
@Override
public String getTypeName()
{
return "float";
}
@Override
public int getMaxIntermediateSize()
{
return Doubles.BYTES;
}
@Override
public String toString()
{
@ -228,12 +131,4 @@ public class DoubleSumAggregatorFactory extends AggregatorFactory
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (expression != null ? expression.hashCode() : 0);
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -19,7 +19,7 @@
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.DoubleColumnSelector;
import java.nio.ByteBuffer;
@ -28,7 +28,7 @@ import java.nio.ByteBuffer;
public class DoubleSumBufferAggregator extends SimpleDoubleBufferAggregator
{
DoubleSumBufferAggregator(FloatColumnSelector selector)
DoubleSumBufferAggregator(DoubleColumnSelector selector)
{
super(selector);
}

View File

@ -64,6 +64,12 @@ public class FilteredAggregator implements Aggregator
return delegate.getLong();
}
@Override
public double getDouble()
{
return delegate.getDouble();
}
@Override
public void close()
{

View File

@ -32,8 +32,6 @@ import java.util.List;
public class FilteredAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x9;
private final AggregatorFactory delegate;
private final DimFilter filter;
@ -118,7 +116,7 @@ public class FilteredAggregatorFactory extends AggregatorFactory
byte[] filterCacheKey = filter.getCacheKey();
byte[] aggregatorCacheKey = delegate.getCacheKey();
return ByteBuffer.allocate(1 + filterCacheKey.length + aggregatorCacheKey.length)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.FILTERED_AGG_CACHE_TYPE_ID)
.put(filterCacheKey)
.put(aggregatorCacheKey)
.array();

View File

@ -67,6 +67,12 @@ public class FilteredBufferAggregator implements BufferAggregator
return delegate.getFloat(buf, position);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return delegate.getDouble(buf, position);
}
@Override
public void close()
{

View File

@ -0,0 +1,95 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
import java.util.Comparator;
/**
*/
public class FloatMaxAggregator implements Aggregator
{
static final Comparator COMPARATOR = FloatSumAggregator.COMPARATOR;
static double combineValues(Object lhs, Object rhs)
{
return Math.max(((Number) lhs).floatValue(), ((Number) rhs).floatValue());
}
private final FloatColumnSelector selector;
private float max;
public FloatMaxAggregator(FloatColumnSelector selector)
{
this.selector = selector;
reset();
}
@Override
public void aggregate()
{
max = Math.max(max, selector.get());
}
@Override
public void reset()
{
max = Float.NEGATIVE_INFINITY;
}
@Override
public Object get()
{
return max;
}
@Override
public float getFloat()
{
return max;
}
@Override
public long getLong()
{
return (long) max;
}
@Override
public double getDouble()
{
return (double) max;
}
@Override
public Aggregator clone()
{
return new FloatMaxAggregator(selector);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.java.util.common.StringUtils;
import io.druid.math.expr.ExprMacroTable;
import io.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
*/
public class FloatMaxAggregatorFactory extends SimpleFloatAggregatorFactory
{
@JsonCreator
public FloatMaxAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("expression") String expression,
@JacksonInject ExprMacroTable macroTable
)
{
super(macroTable, name, fieldName, expression);
}
public FloatMaxAggregatorFactory(String name, String fieldName)
{
this(name, fieldName, null, ExprMacroTable.nil());
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new FloatMaxAggregator(getFloatColumnSelector(metricFactory, Float.NEGATIVE_INFINITY));
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new FloatMaxBufferAggregator(getFloatColumnSelector(metricFactory, Float.NEGATIVE_INFINITY));
}
@Override
public Object combine(Object lhs, Object rhs)
{
return FloatMaxAggregator.combineValues(finalizeComputation(lhs), finalizeComputation(rhs));
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new FloatMaxAggregatorFactory(name, name, null, macroTable);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(new FloatMaxAggregatorFactory(fieldName, fieldName, expression, macroTable));
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public String getExpression()
{
return expression;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(AggregatorUtil.FLOAT_MAX_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)
.array();
}
@Override
public String toString()
{
return "FloatMaxAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", expression='" + expression + '\'' +
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FloatMaxAggregatorFactory that = (FloatMaxAggregatorFactory) o;
if (!Objects.equals(fieldName, that.fieldName)) {
return false;
}
if (!Objects.equals(expression, that.expression)) {
return false;
}
if (!Objects.equals(name, that.name)) {
return false;
}
return true;
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
import java.nio.ByteBuffer;
/**
*/
public class FloatMaxBufferAggregator extends SimpleFloatBufferAggregator
{
FloatMaxBufferAggregator(FloatColumnSelector selector)
{
super(selector);
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putFloat(position, Float.NEGATIVE_INFINITY);
}
@Override
public void putFirst(ByteBuffer buf, int position, float value)
{
if (!Float.isNaN(value)) {
buf.putFloat(position, value);
} else {
init(buf, position);
}
}
@Override
public void aggregate(ByteBuffer buf, int position, float value)
{
buf.putFloat(position, Math.max(buf.getFloat(position), value));
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
import java.util.Comparator;
/**
*/
public class FloatMinAggregator implements Aggregator
{
static final Comparator COMPARATOR = FloatSumAggregator.COMPARATOR;
static double combineValues(Object lhs, Object rhs)
{
return Math.min(((Number) lhs).floatValue(), ((Number) rhs).floatValue());
}
private final FloatColumnSelector selector;
private float min;
public FloatMinAggregator(FloatColumnSelector selector)
{
this.selector = selector;
reset();
}
@Override
public void aggregate()
{
min = Math.min(min, selector.get());
}
@Override
public void reset()
{
min = Float.POSITIVE_INFINITY;
}
@Override
public Object get()
{
return min;
}
@Override
public float getFloat()
{
return min;
}
@Override
public long getLong()
{
return (long) min;
}
@Override
public double getDouble()
{
return (double) min;
}
@Override
public Aggregator clone()
{
return new FloatMinAggregator(selector);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.java.util.common.StringUtils;
import io.druid.math.expr.ExprMacroTable;
import io.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
/**
*/
public class FloatMinAggregatorFactory extends SimpleFloatAggregatorFactory
{
@JsonCreator
public FloatMinAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("expression") String expression,
@JacksonInject ExprMacroTable macroTable
)
{
super(macroTable, name, fieldName, expression);
}
public FloatMinAggregatorFactory(String name, String fieldName)
{
this(name, fieldName, null, ExprMacroTable.nil());
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new FloatMinAggregator(getFloatColumnSelector(metricFactory, Float.POSITIVE_INFINITY));
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new FloatMinBufferAggregator(getFloatColumnSelector(metricFactory, Float.POSITIVE_INFINITY));
}
@Override
public Object combine(Object lhs, Object rhs)
{
return FloatMinAggregator.combineValues(lhs, rhs);
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new FloatMinAggregatorFactory(name, name, null, macroTable);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new FloatMinAggregatorFactory(
fieldName,
fieldName,
expression,
macroTable
));
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public String getExpression()
{
return expression;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(AggregatorUtil.FLOAT_MIN_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)
.array();
}
@Override
public String toString()
{
return "FloatMinAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", expression='" + expression + '\'' +
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FloatMinAggregatorFactory that = (FloatMinAggregatorFactory) o;
if (!Objects.equals(fieldName, that.fieldName)) {
return false;
}
if (!Objects.equals(expression, that.expression)) {
return false;
}
if (!Objects.equals(name, that.name)) {
return false;
}
return true;
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
import java.nio.ByteBuffer;
/**
*/
public class FloatMinBufferAggregator extends SimpleFloatBufferAggregator
{
FloatMinBufferAggregator(FloatColumnSelector selector)
{
super(selector);
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putFloat(position, Float.POSITIVE_INFINITY);
}
@Override
public void putFirst(ByteBuffer buf, int position,float value)
{
if (!Float.isNaN(value)) {
buf.putFloat(position, value);
} else {
init(buf, position);
}
}
@Override
public void aggregate(ByteBuffer buf, int position, float value)
{
buf.putFloat(position, Math.min(buf.getFloat(position), value));
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.google.common.collect.Ordering;
import io.druid.segment.FloatColumnSelector;
import java.util.Comparator;
/**
*/
public class FloatSumAggregator implements Aggregator
{
static final Comparator COMPARATOR = new Ordering()
{
@Override
public int compare(Object o, Object o1)
{
return Float.compare(((Number) o).floatValue(), ((Number) o1).floatValue());
}
}.nullsFirst();
static double combineValues(Object lhs, Object rhs)
{
return ((Number) lhs).floatValue() + ((Number) rhs).floatValue();
}
private final FloatColumnSelector selector;
private float sum;
public FloatSumAggregator(FloatColumnSelector selector)
{
this.selector = selector;
this.sum = 0;
}
@Override
public void aggregate()
{
sum += selector.get();
}
@Override
public void reset()
{
sum = 0;
}
@Override
public Object get()
{
return sum;
}
@Override
public float getFloat()
{
return sum;
}
@Override
public long getLong()
{
return (long) sum;
}
@Override
public double getDouble()
{
return (double) sum;
}
@Override
public Aggregator clone()
{
return new FloatSumAggregator(selector);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.java.util.common.StringUtils;
import io.druid.math.expr.ExprMacroTable;
import io.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
/**
*/
public class FloatSumAggregatorFactory extends SimpleFloatAggregatorFactory
{
@JsonCreator
public FloatSumAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@JsonProperty("expression") String expression,
@JacksonInject ExprMacroTable macroTable
)
{
super(macroTable, name, fieldName, expression);
}
public FloatSumAggregatorFactory(String name, String fieldName)
{
this(name, fieldName, null, ExprMacroTable.nil());
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new FloatSumAggregator(getFloatColumnSelector(metricFactory, 0.0f));
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new FloatSumBufferAggregator(getFloatColumnSelector(metricFactory, 0.0f));
}
@Override
public Object combine(Object lhs, Object rhs)
{
return FloatSumAggregator.combineValues(lhs, rhs);
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new FloatSumAggregatorFactory(name, name, null, macroTable);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.asList(new FloatSumAggregatorFactory(fieldName, fieldName, expression, macroTable));
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public String getExpression()
{
return expression;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(AggregatorUtil.FLOAT_SUM_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)
.array();
}
@Override
public String toString()
{
return "FloatSumAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", expression='" + expression + '\'' +
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FloatSumAggregatorFactory that = (FloatSumAggregatorFactory) o;
if (!Objects.equals(fieldName, that.fieldName)) {
return false;
}
if (!Objects.equals(expression, that.expression)) {
return false;
}
if (!Objects.equals(name, that.name)) {
return false;
}
return true;
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
import java.nio.ByteBuffer;
/**
*/
public class FloatSumBufferAggregator extends SimpleFloatBufferAggregator
{
FloatSumBufferAggregator(FloatColumnSelector selector)
{
super(selector);
}
@Override
public void putFirst(ByteBuffer buf, int position, float value)
{
buf.putFloat(position, value);
}
@Override
public void aggregate(ByteBuffer buf, int position, float value)
{
buf.putFloat(position, buf.getFloat(position) + value);
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putFloat(position, 0.0f);
}
}

View File

@ -80,6 +80,12 @@ public class HistogramAggregator implements Aggregator
throw new UnsupportedOperationException("HistogramAggregator does not support getLong()");
}
@Override
public double getDouble()
{
throw new UnsupportedOperationException("HistogramAggregator does not support getDouble()");
}
@Override
public void close()
{

View File

@ -36,8 +36,6 @@ import java.util.List;
public class HistogramAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x7;
private final String name;
private final String fieldName;
private final List<Float> breaksList;
@ -153,7 +151,7 @@ public class HistogramAggregatorFactory extends AggregatorFactory
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
ByteBuffer buf = ByteBuffer
.allocate(1 + fieldNameBytes.length + Floats.BYTES * breaks.length)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.HIST_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put((byte) 0xFF);
buf.asFloatBuffer().put(breaks);

View File

@ -101,6 +101,12 @@ public class HistogramBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("HistogramBufferAggregator does not support getLong()");
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("HistogramBufferAggregator does not support getDouble");
}
@Override
public void close()
{

View File

@ -80,6 +80,12 @@ public class JavaScriptAggregator implements Aggregator
return (long) current;
}
@Override
public double getDouble()
{
return current;
}
@Override
public void close()
{

View File

@ -50,8 +50,6 @@ import java.util.Objects;
public class JavaScriptAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x6;
private final String name;
private final List<String> fieldNames;
private final String fnAggregate;
@ -240,7 +238,7 @@ public class JavaScriptAggregatorFactory extends AggregatorFactory
byte[] sha1 = md.digest(StringUtils.toUtf8(fnAggregate + fnReset + fnCombine));
return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.JS_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(sha1)
.array();

View File

@ -71,6 +71,12 @@ public class JavaScriptBufferAggregator implements BufferAggregator
return (long) buf.getDouble(position);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return buf.getDouble(position);
}
@Override
public void close() {
script.close();

View File

@ -51,6 +51,12 @@ public abstract class LongBufferAggregator implements BufferAggregator
return buf.getLong(position);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return (double) buf.getLong(position);
}
@Override
public void close()
{

View File

@ -75,6 +75,12 @@ public class LongMaxAggregator implements Aggregator
return max;
}
@Override
public double getDouble()
{
return (double) max;
}
@Override
public Aggregator clone()
{

View File

@ -41,8 +41,6 @@ import java.util.Objects;
*/
public class LongMaxAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0xA;
private final String name;
private final String fieldName;
private final String expression;
@ -170,7 +168,7 @@ public class LongMaxAggregatorFactory extends AggregatorFactory
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.LONG_MAX_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)

View File

@ -75,6 +75,12 @@ public class LongMinAggregator implements Aggregator
return min;
}
@Override
public double getDouble()
{
return (double) min;
}
@Override
public Aggregator clone()
{

View File

@ -41,7 +41,6 @@ import java.util.Objects;
*/
public class LongMinAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0xB;
private final String name;
private final String fieldName;
@ -170,7 +169,7 @@ public class LongMinAggregatorFactory extends AggregatorFactory
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.LONG_MIN_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)

View File

@ -83,6 +83,12 @@ public class LongSumAggregator implements Aggregator
return sum;
}
@Override
public double getDouble()
{
return (double) sum;
}
@Override
public Aggregator clone()
{

View File

@ -41,8 +41,6 @@ import java.util.Objects;
*/
public class LongSumAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x1;
private final String name;
private final String fieldName;
private final String expression;
@ -170,7 +168,7 @@ public class LongSumAggregatorFactory extends AggregatorFactory
byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(CACHE_TYPE_ID)
.put(AggregatorUtil.LONG_SUM_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)

View File

@ -54,6 +54,12 @@ public final class NoopAggregator implements Aggregator
return 0;
}
@Override
public double getDouble()
{
return 0;
}
@Override
public void close()
{

View File

@ -65,6 +65,12 @@ public final class NoopBufferAggregator implements BufferAggregator
return 0L;
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return 0d;
}
@Override
public void close()
{

View File

@ -0,0 +1,141 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.math.expr.ExprMacroTable;
import io.druid.math.expr.Parser;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DoubleColumnSelector;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
public abstract class SimpleDoubleAggregatorFactory extends AggregatorFactory
{
protected final String name;
protected final String fieldName;
protected final String expression;
protected final ExprMacroTable macroTable;
public SimpleDoubleAggregatorFactory(
ExprMacroTable macroTable,
String fieldName,
String name,
String expression
)
{
this.macroTable = macroTable;
this.fieldName = fieldName;
this.name = name;
this.expression = expression;
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkArgument(
fieldName == null ^ expression == null,
"Must have a valid, non-null fieldName or expression"
);
}
protected DoubleColumnSelector getDoubleColumnSelector(ColumnSelectorFactory metricFactory, Double nullValue)
{
return AggregatorUtil.getDoubleColumnSelector(metricFactory, macroTable, fieldName, expression, nullValue);
}
@Override
public Object deserialize(Object object)
{
// handle "NaN" / "Infinity" values serialized as strings in JSON
if (object instanceof String) {
return Double.parseDouble((String) object);
}
return object;
}
@Override
public String getTypeName()
{
return "double";
}
@Override
public int getMaxIntermediateSize()
{
return Double.BYTES;
}
@Override
public int hashCode()
{
return Objects.hash(fieldName, expression, name);
}
@Override
public Comparator getComparator()
{
return DoubleSumAggregator.COMPARATOR;
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<String> requiredFields()
{
return fieldName != null
? Collections.singletonList(fieldName)
: Parser.findRequiredBindings(Parser.parse(expression, macroTable));
}
@Override
public Object finalizeComputation(Object object)
{
return object;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public String getExpression()
{
return expression;
}
}

View File

@ -21,20 +21,20 @@ package io.druid.query.aggregation;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.DoubleColumnSelector;
import java.nio.ByteBuffer;
public abstract class SimpleDoubleBufferAggregator implements BufferAggregator
{
protected final FloatColumnSelector selector;
protected final DoubleColumnSelector selector;
SimpleDoubleBufferAggregator(FloatColumnSelector selector)
SimpleDoubleBufferAggregator(DoubleColumnSelector selector)
{
this.selector = selector;
}
public FloatColumnSelector getSelector()
public DoubleColumnSelector getSelector()
{
return selector;
}
@ -53,7 +53,7 @@ public abstract class SimpleDoubleBufferAggregator implements BufferAggregator
@Override
public final void aggregate(ByteBuffer buf, int position)
{
aggregate(buf, position, (double) selector.get());
aggregate(buf, position, selector.get());
}
@Override
@ -74,6 +74,12 @@ public abstract class SimpleDoubleBufferAggregator implements BufferAggregator
return (long) buf.getDouble(position);
}
@Override
public double getDouble(ByteBuffer buffer, int position)
{
return buffer.getDouble(position);
}
@Override
public void close()
{

View File

@ -0,0 +1,121 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.google.common.base.Preconditions;
import io.druid.math.expr.ExprMacroTable;
import io.druid.math.expr.Parser;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.FloatColumnSelector;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
public abstract class SimpleFloatAggregatorFactory extends AggregatorFactory
{
protected final String name;
protected final String fieldName;
protected final String expression;
protected final ExprMacroTable macroTable;
public SimpleFloatAggregatorFactory(
ExprMacroTable macroTable,
String name,
final String fieldName,
String expression
)
{
this.macroTable = macroTable;
this.name = name;
this.fieldName = fieldName;
this.expression = expression;
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkArgument(
fieldName == null ^ expression == null,
"Must have a valid, non-null fieldName or expression"
);
}
protected FloatColumnSelector getFloatColumnSelector(ColumnSelectorFactory metricFactory, Float nullValue)
{
return AggregatorUtil.getFloatColumnSelector(metricFactory, macroTable, fieldName, expression, nullValue);
}
@Override
public Object deserialize(Object object)
{
// handle "NaN" / "Infinity" values serialized as strings in JSON
if (object instanceof String) {
return Float.parseFloat((String) object);
}
return object;
}
@Override
public String getTypeName()
{
return "float";
}
@Override
public int getMaxIntermediateSize()
{
return Float.BYTES;
}
@Override
public Comparator getComparator()
{
return FloatSumAggregator.COMPARATOR;
}
@Override
public Object finalizeComputation(Object object)
{
return object;
}
@Override
public List<String> requiredFields()
{
return fieldName != null
? Collections.singletonList(fieldName)
: Parser.findRequiredBindings(Parser.parse(expression, macroTable));
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public int hashCode()
{
return Objects.hash(fieldName, expression, name);
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.FloatColumnSelector;
import java.nio.ByteBuffer;
public abstract class SimpleFloatBufferAggregator implements BufferAggregator
{
protected final FloatColumnSelector selector;
SimpleFloatBufferAggregator(FloatColumnSelector selector)
{
this.selector = selector;
}
public FloatColumnSelector getSelector()
{
return selector;
}
/**
* Faster equivalent to
* aggregator.init(buf, position);
* aggregator.aggregate(buf, position, value);
*/
@CalledFromHotLoop
public abstract void putFirst(ByteBuffer buf, int position, float value);
@CalledFromHotLoop
public abstract void aggregate(ByteBuffer buf, int position, float value);
@Override
public final void aggregate(ByteBuffer buf, int position)
{
aggregate(buf, position, selector.get());
}
@Override
public final Object get(ByteBuffer buf, int position)
{
return buf.getFloat(position);
}
@Override
public final float getFloat(ByteBuffer buf, int position)
{
return buf.getFloat(position);
}
@Override
public final long getLong(ByteBuffer buf, int position)
{
return (long) buf.getFloat(position);
}
@Override
public double getDouble(ByteBuffer buffer, int position)
{
return (double) buffer.getFloat(position);
}
@Override
public void close()
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

View File

@ -126,6 +126,12 @@ public class CardinalityAggregator implements Aggregator
throw new UnsupportedOperationException("CardinalityAggregator does not support getLong()");
}
@Override
public double getDouble()
{
throw new UnsupportedOperationException("CardinalityAggregator does not support getDouble()");
}
@Override
public Aggregator clone()
{

View File

@ -31,6 +31,7 @@ import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.NoopAggregator;
import io.druid.query.aggregation.NoopBufferAggregator;
@ -94,8 +95,6 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
return ((HyperLogLogCollector) object).estimateCardinality();
}
private static final byte CACHE_TYPE_ID = (byte) 0x8;
private static final byte CACHE_KEY_SEPARATOR = (byte) 0xFF;
private static final CardinalityAggregatorColumnSelectorStrategyFactory STRATEGY_FACTORY =
new CardinalityAggregatorColumnSelectorStrategyFactory();
@ -283,10 +282,10 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
}
ByteBuffer retBuf = ByteBuffer.allocate(2 + dimSpecKeysLength);
retBuf.put(CACHE_TYPE_ID);
retBuf.put(AggregatorUtil.CARD_CACHE_TYPE_ID);
for (byte[] dimSpecKey : dimSpecKeys) {
retBuf.put(dimSpecKey);
retBuf.put(CACHE_KEY_SEPARATOR);
retBuf.put(AggregatorUtil.STRING_SEPARATOR);
}
retBuf.put((byte) (byRow ? 1 : 0));
return retBuf.array();

View File

@ -97,6 +97,12 @@ public class CardinalityBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("CardinalityBufferAggregator does not support getLong()");
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("CardinalityBufferAggregators does not support getDouble()");
}
@Override
public void close()
{

View File

@ -41,6 +41,8 @@ public class CardinalityAggregatorColumnSelectorStrategyFactory
return new LongCardinalityAggregatorColumnSelectorStrategy();
case FLOAT:
return new FloatCardinalityAggregatorColumnSelectorStrategy();
case DOUBLE:
return new DoubleCardinalityAggregatorColumnSelectorStrategy();
default:
throw new IAE("Cannot create query type helper from invalid type [%s]", type);
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.cardinality.types;
import com.google.common.hash.Hasher;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.cardinality.CardinalityAggregator;
import io.druid.segment.DoubleColumnSelector;
public class DoubleCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy<DoubleColumnSelector>
{
@Override
public void hashRow(DoubleColumnSelector dimSelector, Hasher hasher)
{
hasher.putDouble(dimSelector.get());
}
@Override
public void hashValues(DoubleColumnSelector dimSelector, HyperLogLogCollector collector)
{
collector.add(CardinalityAggregator.hashFn.hashLong(Double.doubleToLongBits(dimSelector.get())).asBytes());
}
}

View File

@ -21,13 +21,13 @@ package io.druid.query.aggregation.first;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.DoubleColumnSelector;
import io.druid.segment.LongColumnSelector;
public class DoubleFirstAggregator implements Aggregator
{
private final FloatColumnSelector valueSelector;
private final DoubleColumnSelector valueSelector;
private final LongColumnSelector timeSelector;
private final String name;
@ -37,7 +37,7 @@ public class DoubleFirstAggregator implements Aggregator
public DoubleFirstAggregator(
String name,
LongColumnSelector timeSelector,
FloatColumnSelector valueSelector
DoubleColumnSelector valueSelector
)
{
this.name = name;
@ -77,9 +77,9 @@ public class DoubleFirstAggregator implements Aggregator
}
@Override
public void close()
public double getDouble()
{
return firstValue;
}
@Override
@ -87,5 +87,11 @@ public class DoubleFirstAggregator implements Aggregator
{
return (long) firstValue;
}
@Override
public void close()
{
}
}

View File

@ -29,6 +29,7 @@ import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelectorFactory;
@ -40,28 +41,19 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class DoubleFirstAggregatorFactory extends AggregatorFactory
{
public static final Comparator VALUE_COMPARATOR = new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
return Doubles.compare(((SerializablePair<Long, Double>) o1).rhs, ((SerializablePair<Long, Double>) o2).rhs);
}
};
public static final Comparator VALUE_COMPARATOR = (o1, o2) -> Doubles.compare(
((SerializablePair<Long, Double>) o1).rhs,
((SerializablePair<Long, Double>) o2).rhs
);
public static final Comparator TIME_COMPARATOR = new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
return Longs.compare(((SerializablePair<Long, Object>) o1).lhs, ((SerializablePair<Long, Object>) o2).lhs);
}
};
private static final byte CACHE_TYPE_ID = 16;
public static final Comparator TIME_COMPARATOR = (o1, o2) -> Longs.compare(
((SerializablePair<Long, Object>) o1).lhs,
((SerializablePair<Long, Object>) o2).lhs
);
private final String fieldName;
private final String name;
@ -85,7 +77,7 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
return new DoubleFirstAggregator(
name,
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeFloatColumnSelector(fieldName)
metricFactory.makeDoubleColumnSelector(fieldName)
);
}
@ -94,7 +86,7 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
{
return new DoubleFirstBufferAggregator(
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeFloatColumnSelector(fieldName)
metricFactory.makeDoubleColumnSelector(fieldName)
);
}
@ -213,23 +205,22 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(2 + fieldNameBytes.length)
.put(CACHE_TYPE_ID)
return ByteBuffer.allocate(1 + fieldNameBytes.length)
.put(AggregatorUtil.DOUBLE_FIRST_CACHE_TYPE_ID)
.put(fieldNameBytes)
.put((byte)0xff)
.array();
}
@Override
public String getTypeName()
{
return "float";
return "double";
}
@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES + Doubles.BYTES;
return Long.BYTES + Double.BYTES;
}
@Override
@ -250,9 +241,7 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
return Objects.hash(fieldName, name);
}
@Override

View File

@ -19,11 +19,10 @@
package io.druid.query.aggregation.first;
import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.DoubleColumnSelector;
import io.druid.segment.LongColumnSelector;
import java.nio.ByteBuffer;
@ -31,9 +30,9 @@ import java.nio.ByteBuffer;
public class DoubleFirstBufferAggregator implements BufferAggregator
{
private final LongColumnSelector timeSelector;
private final FloatColumnSelector valueSelector;
private final DoubleColumnSelector valueSelector;
public DoubleFirstBufferAggregator(LongColumnSelector timeSelector, FloatColumnSelector valueSelector)
public DoubleFirstBufferAggregator(LongColumnSelector timeSelector, DoubleColumnSelector valueSelector)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
@ -43,7 +42,7 @@ public class DoubleFirstBufferAggregator implements BufferAggregator
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MAX_VALUE);
buf.putDouble(position + Longs.BYTES, 0);
buf.putDouble(position + Long.BYTES, 0);
}
@Override
@ -53,26 +52,32 @@ public class DoubleFirstBufferAggregator implements BufferAggregator
long firstTime = buf.getLong(position);
if (time < firstTime) {
buf.putLong(position, time);
buf.putDouble(position + Longs.BYTES, valueSelector.get());
buf.putDouble(position + Long.BYTES, valueSelector.get());
}
}
@Override
public Object get(ByteBuffer buf, int position)
{
return new SerializablePair<>(buf.getLong(position), buf.getDouble(position + Longs.BYTES));
return new SerializablePair<>(buf.getLong(position), buf.getDouble(position + Long.BYTES));
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getDouble(position + Longs.BYTES);
return (float) buf.getDouble(position + Long.BYTES);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position + Longs.BYTES);
return (long) buf.getDouble(position + Long.BYTES);
}
@Override
public double getDouble(ByteBuffer buf, int position)
{
return buf.getDouble(position + Long.BYTES);
}
@Override

View File

@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.first;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
public class FloatFirstAggregator implements Aggregator
{
private final FloatColumnSelector valueSelector;
private final LongColumnSelector timeSelector;
private final String name;
protected long firstTime;
protected float firstValue;
public FloatFirstAggregator(
String name,
LongColumnSelector timeSelector,
FloatColumnSelector valueSelector
)
{
this.name = name;
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;
reset();
}
@Override
public void aggregate()
{
long time = timeSelector.get();
if (time < firstTime) {
firstTime = time;
firstValue = valueSelector.get();
}
}
@Override
public void reset()
{
firstTime = Long.MAX_VALUE;
firstValue = 0;
}
@Override
public Object get()
{
return new SerializablePair<>(firstTime, firstValue);
}
@Override
public float getFloat()
{
return firstValue;
}
@Override
public double getDouble()
{
return (double) firstValue;
}
@Override
public long getLong()
{
return (long) firstValue;
}
@Override
public void close()
{
}
}

View File

@ -0,0 +1,255 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.first;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class FloatFirstAggregatorFactory extends AggregatorFactory
{
public static final Comparator VALUE_COMPARATOR = (o1, o2) -> Doubles.compare(
((SerializablePair<Long, Float>) o1).rhs,
((SerializablePair<Long, Float>) o2).rhs
);
public static final Comparator TIME_COMPARATOR = (o1, o2) -> Longs.compare(
((SerializablePair<Long, Object>) o1).lhs,
((SerializablePair<Long, Object>) o2).lhs
);
private final String fieldName;
private final String name;
@JsonCreator
public FloatFirstAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name;
this.fieldName = fieldName;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new FloatFirstAggregator(
name,
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeFloatColumnSelector(fieldName)
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new FloatFirstBufferAggregator(
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeFloatColumnSelector(fieldName)
);
}
@Override
public Comparator getComparator()
{
return VALUE_COMPARATOR;
}
@Override
public Object combine(Object lhs, Object rhs)
{
return TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs;
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new FloatFirstAggregatorFactory(name, name)
{
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new FloatFirstAggregator(name, null, null)
{
@Override
public void aggregate()
{
SerializablePair<Long, Float> pair = (SerializablePair<Long, Float>) selector.get();
if (pair.lhs < firstTime) {
firstTime = pair.lhs;
firstValue = pair.rhs;
}
}
};
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new FloatFirstBufferAggregator(null, null)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePair<Long, Float> pair = (SerializablePair<Long, Float>) selector.get();
long firstTime = buf.getLong(position);
if (pair.lhs < firstTime) {
buf.putLong(position, pair.lhs);
buf.putFloat(position + Longs.BYTES, pair.rhs);
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
}
};
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new FloatFirstAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{
Map map = (Map) object;
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).floatValue());
}
@Override
public Object finalizeComputation(Object object)
{
return ((SerializablePair<Long, Float>) object).rhs;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
public List<String> requiredFields()
{
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(1 + fieldNameBytes.length)
.put(AggregatorUtil.FLOAT_FIRST_CACHE_TYPE_ID)
.put(fieldNameBytes)
.array();
}
@Override
public String getTypeName()
{
return "float";
}
@Override
public int getMaxIntermediateSize()
{
return Long.BYTES + Float.BYTES;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FloatFirstAggregatorFactory that = (FloatFirstAggregatorFactory) o;
return fieldName.equals(that.fieldName) && name.equals(that.name);
}
@Override
public int hashCode()
{
return Objects.hash(fieldName, name);
}
@Override
public String toString()
{
return "FloatFirstAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}

Some files were not shown because too many files have changed in this diff Show More