mirror of https://github.com/apache/druid.git
Deprecate Aggregator.getName and AggregatorFactory.getAggregatorStartValue. (#3572)
This commit is contained in:
parent
32c5494e97
commit
89d9c61894
|
@ -26,17 +26,14 @@ import io.druid.segment.DimensionSelector;
|
|||
public class DistinctCountAggregator implements Aggregator
|
||||
{
|
||||
|
||||
private final String name;
|
||||
private final DimensionSelector selector;
|
||||
private final MutableBitmap mutableBitmap;
|
||||
|
||||
public DistinctCountAggregator(
|
||||
String name,
|
||||
DimensionSelector selector,
|
||||
MutableBitmap mutableBitmap
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
this.mutableBitmap = mutableBitmap;
|
||||
}
|
||||
|
@ -70,7 +67,7 @@ public class DistinctCountAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -69,10 +69,9 @@ public class DistinctCountAggregatorFactory extends AggregatorFactory
|
|||
{
|
||||
DimensionSelector selector = makeDimensionSelector(columnFactory);
|
||||
if (selector == null) {
|
||||
return new EmptyDistinctCountAggregator(name);
|
||||
return new EmptyDistinctCountAggregator();
|
||||
} else {
|
||||
return new DistinctCountAggregator(
|
||||
name,
|
||||
selector,
|
||||
bitMapFactory.makeEmptyMutableBitmap()
|
||||
);
|
||||
|
@ -197,12 +196,6 @@ public class DistinctCountAggregatorFactory extends AggregatorFactory
|
|||
return Longs.BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
|
|
@ -24,11 +24,8 @@ import io.druid.query.aggregation.Aggregator;
|
|||
public class EmptyDistinctCountAggregator implements Aggregator
|
||||
{
|
||||
|
||||
private final String name;
|
||||
|
||||
public EmptyDistinctCountAggregator(String name)
|
||||
public EmptyDistinctCountAggregator()
|
||||
{
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -56,7 +53,7 @@ public class EmptyDistinctCountAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,11 +23,8 @@ import io.druid.query.aggregation.Aggregator;
|
|||
|
||||
public class EmptySketchAggregator implements Aggregator
|
||||
{
|
||||
private final String name;
|
||||
|
||||
public EmptySketchAggregator(String name)
|
||||
public EmptySketchAggregator()
|
||||
{
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -61,7 +58,7 @@ public class EmptySketchAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -37,16 +37,12 @@ public class SketchAggregator implements Aggregator
|
|||
private static final Logger logger = new Logger(SketchAggregator.class);
|
||||
|
||||
private final ObjectColumnSelector selector;
|
||||
private final String name;
|
||||
private final int size;
|
||||
|
||||
private Union union;
|
||||
|
||||
public SketchAggregator(String name, ObjectColumnSelector selector, int size)
|
||||
public SketchAggregator(ObjectColumnSelector selector, int size)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
this.size = size;
|
||||
union = new SynchronizedUnion((Union) SetOperation.builder().build(size, Family.UNION));
|
||||
}
|
||||
|
||||
|
@ -93,7 +89,7 @@ public class SketchAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -29,7 +29,6 @@ import com.yahoo.sketches.Util;
|
|||
import com.yahoo.sketches.memory.Memory;
|
||||
import com.yahoo.sketches.theta.SetOperation;
|
||||
import com.yahoo.sketches.theta.Sketch;
|
||||
import com.yahoo.sketches.theta.Sketches;
|
||||
import com.yahoo.sketches.theta.Union;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
|
@ -82,9 +81,9 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
|
|||
{
|
||||
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
|
||||
if (selector == null) {
|
||||
return new EmptySketchAggregator(name);
|
||||
return new EmptySketchAggregator();
|
||||
} else {
|
||||
return new SketchAggregator(name, selector, size);
|
||||
return new SketchAggregator(selector, size);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -172,12 +171,6 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
|
|||
return SetOperation.getMaxUnionBytes(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return Sketches.updateSketchBuilder().build(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> requiredFields()
|
||||
{
|
||||
|
|
|
@ -41,7 +41,6 @@ public class ApproximateHistogramAggregator implements Aggregator
|
|||
return ((ApproximateHistogram) lhs).foldFast((ApproximateHistogram) rhs);
|
||||
}
|
||||
|
||||
private final String name;
|
||||
private final FloatColumnSelector selector;
|
||||
private final int resolution;
|
||||
private final float lowerLimit;
|
||||
|
@ -50,14 +49,12 @@ public class ApproximateHistogramAggregator implements Aggregator
|
|||
private ApproximateHistogram histogram;
|
||||
|
||||
public ApproximateHistogramAggregator(
|
||||
String name,
|
||||
FloatColumnSelector selector,
|
||||
int resolution,
|
||||
float lowerLimit,
|
||||
float upperLimit
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
this.resolution = resolution;
|
||||
this.lowerLimit = lowerLimit;
|
||||
|
@ -98,7 +95,7 @@ public class ApproximateHistogramAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -81,7 +81,6 @@ public class ApproximateHistogramAggregatorFactory extends AggregatorFactory
|
|||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new ApproximateHistogramAggregator(
|
||||
name,
|
||||
metricFactory.makeFloatColumnSelector(fieldName),
|
||||
resolution,
|
||||
lowerLimit,
|
||||
|
@ -254,12 +253,6 @@ public class ApproximateHistogramAggregatorFactory extends AggregatorFactory
|
|||
return new ApproximateHistogram(resolution).getMaxStorageSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return new ApproximateHistogram(resolution);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
|
|
@ -25,7 +25,6 @@ import io.druid.segment.ObjectColumnSelector;
|
|||
|
||||
public class ApproximateHistogramFoldingAggregator implements Aggregator
|
||||
{
|
||||
private final String name;
|
||||
private final ObjectColumnSelector<ApproximateHistogram> selector;
|
||||
private final int resolution;
|
||||
private final float lowerLimit;
|
||||
|
@ -36,14 +35,12 @@ public class ApproximateHistogramFoldingAggregator implements Aggregator
|
|||
private long[] tmpBufferB;
|
||||
|
||||
public ApproximateHistogramFoldingAggregator(
|
||||
String name,
|
||||
ObjectColumnSelector<ApproximateHistogram> selector,
|
||||
int resolution,
|
||||
float lowerLimit,
|
||||
float upperLimit
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
this.resolution = resolution;
|
||||
this.lowerLimit = lowerLimit;
|
||||
|
@ -96,7 +93,7 @@ public class ApproximateHistogramFoldingAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -80,7 +80,6 @@ public class ApproximateHistogramFoldingAggregatorFactory extends ApproximateHis
|
|||
final Class cls = selector.classOfObject();
|
||||
if (cls.equals(Object.class) || ApproximateHistogram.class.isAssignableFrom(cls)) {
|
||||
return new ApproximateHistogramFoldingAggregator(
|
||||
name,
|
||||
selector,
|
||||
resolution,
|
||||
lowerLimit,
|
||||
|
|
|
@ -45,14 +45,14 @@ public class ApproximateHistogramPostAggregatorTest
|
|||
ApproximateHistogram ah = buildHistogram(10, VALUES);
|
||||
final TestFloatColumnSelector selector = new TestFloatColumnSelector(VALUES);
|
||||
|
||||
ApproximateHistogramAggregator agg = new ApproximateHistogramAggregator("price", selector, 10, Float.NEGATIVE_INFINITY, Float.POSITIVE_INFINITY);
|
||||
ApproximateHistogramAggregator agg = new ApproximateHistogramAggregator(selector, 10, Float.NEGATIVE_INFINITY, Float.POSITIVE_INFINITY);
|
||||
for (int i = 0; i < VALUES.length; i++) {
|
||||
agg.aggregate();
|
||||
selector.increment();
|
||||
}
|
||||
|
||||
Map<String, Object> metricValues = new HashMap<String, Object>();
|
||||
metricValues.put(agg.getName(), agg.get());
|
||||
metricValues.put("price", agg.get());
|
||||
|
||||
ApproximateHistogramPostAggregator approximateHistogramPostAggregator = new EqualBucketsPostAggregator(
|
||||
"approxHist",
|
||||
|
|
|
@ -28,13 +28,10 @@ import io.druid.segment.ObjectColumnSelector;
|
|||
*/
|
||||
public abstract class VarianceAggregator implements Aggregator
|
||||
{
|
||||
protected final String name;
|
||||
|
||||
protected final VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
|
||||
|
||||
public VarianceAggregator(String name)
|
||||
public VarianceAggregator()
|
||||
{
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -52,7 +49,7 @@ public abstract class VarianceAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -76,9 +73,9 @@ public abstract class VarianceAggregator implements Aggregator
|
|||
{
|
||||
private final FloatColumnSelector selector;
|
||||
|
||||
public FloatVarianceAggregator(String name, FloatColumnSelector selector)
|
||||
public FloatVarianceAggregator(FloatColumnSelector selector)
|
||||
{
|
||||
super(name);
|
||||
super();
|
||||
this.selector = selector;
|
||||
}
|
||||
|
||||
|
@ -93,9 +90,9 @@ public abstract class VarianceAggregator implements Aggregator
|
|||
{
|
||||
private final LongColumnSelector selector;
|
||||
|
||||
public LongVarianceAggregator(String name, LongColumnSelector selector)
|
||||
public LongVarianceAggregator(LongColumnSelector selector)
|
||||
{
|
||||
super(name);
|
||||
super();
|
||||
this.selector = selector;
|
||||
}
|
||||
|
||||
|
@ -110,9 +107,9 @@ public abstract class VarianceAggregator implements Aggregator
|
|||
{
|
||||
private final ObjectColumnSelector selector;
|
||||
|
||||
public ObjectVarianceAggregator(String name, ObjectColumnSelector selector)
|
||||
public ObjectVarianceAggregator(ObjectColumnSelector selector)
|
||||
{
|
||||
super(name);
|
||||
super();
|
||||
this.selector = selector;
|
||||
}
|
||||
|
||||
|
|
|
@ -99,17 +99,11 @@ public class VarianceAggregatorFactory extends AggregatorFactory
|
|||
}
|
||||
|
||||
if ("float".equalsIgnoreCase(inputType)) {
|
||||
return new VarianceAggregator.FloatVarianceAggregator(
|
||||
name,
|
||||
metricFactory.makeFloatColumnSelector(fieldName)
|
||||
);
|
||||
return new VarianceAggregator.FloatVarianceAggregator(metricFactory.makeFloatColumnSelector(fieldName));
|
||||
} else if ("long".equalsIgnoreCase(inputType)) {
|
||||
return new VarianceAggregator.LongVarianceAggregator(
|
||||
name,
|
||||
metricFactory.makeLongColumnSelector(fieldName)
|
||||
);
|
||||
return new VarianceAggregator.LongVarianceAggregator(metricFactory.makeLongColumnSelector(fieldName));
|
||||
} else if ("variance".equalsIgnoreCase(inputType)) {
|
||||
return new VarianceAggregator.ObjectVarianceAggregator(name, selector);
|
||||
return new VarianceAggregator.ObjectVarianceAggregator(selector);
|
||||
}
|
||||
throw new IAE(
|
||||
"Incompatible type for metric[%s], expected a float, long or variance, got a %s", fieldName, inputType
|
||||
|
@ -169,12 +163,6 @@ public class VarianceAggregatorFactory extends AggregatorFactory
|
|||
return VarianceAggregatorCollector.COMPARATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return new VarianceAggregatorCollector();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object combine(Object lhs, Object rhs)
|
||||
{
|
||||
|
|
|
@ -76,8 +76,6 @@ public class VarianceAggregatorTest
|
|||
{
|
||||
VarianceAggregator agg = (VarianceAggregator) aggFactory.factorize(colSelectorFactory);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
||||
assertValues((VarianceAggregatorCollector) agg.get(), 0, 0d, 0d);
|
||||
aggregate(selector, agg);
|
||||
assertValues((VarianceAggregatorCollector) agg.get(), 1, 1.1d, 0d);
|
||||
|
|
|
@ -99,9 +99,9 @@ public class InputRowSerde
|
|||
catch (ParseException e) {
|
||||
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
|
||||
if (reportParseExceptions) {
|
||||
throw new ParseException(e, "Encountered parse error for aggregator[%s]", agg.getName());
|
||||
throw new ParseException(e, "Encountered parse error for aggregator[%s]", k);
|
||||
}
|
||||
log.debug(e, "Encountered parse error, skipping aggregator[%s].", agg.getName());
|
||||
log.debug(e, "Encountered parse error, skipping aggregator[%s].", k);
|
||||
}
|
||||
|
||||
String t = aggFactory.getTypeName();
|
||||
|
|
|
@ -44,17 +44,6 @@ public class QueryRunnerHelper
|
|||
{
|
||||
private static final Logger log = new Logger(QueryRunnerHelper.class);
|
||||
|
||||
public static Aggregator[] makeAggregators(Cursor cursor, List<AggregatorFactory> aggregatorSpecs)
|
||||
{
|
||||
Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()];
|
||||
int aggregatorIndex = 0;
|
||||
for (AggregatorFactory spec : aggregatorSpecs) {
|
||||
aggregators[aggregatorIndex] = spec.factorize(cursor);
|
||||
++aggregatorIndex;
|
||||
}
|
||||
return aggregators;
|
||||
}
|
||||
|
||||
public static <T> Sequence<Result<T>> makeCursorBasedQuery(
|
||||
final StorageAdapter adapter,
|
||||
List<Interval> queryIntervals,
|
||||
|
|
|
@ -37,7 +37,11 @@ public interface Aggregator {
|
|||
void reset();
|
||||
Object get();
|
||||
float getFloat();
|
||||
String getName();
|
||||
|
||||
/**
|
||||
* Deprecated, to be removed in 0.10.0. See https://github.com/druid-io/druid/issues/3588.
|
||||
*/
|
||||
@Deprecated String getName();
|
||||
void close();
|
||||
|
||||
long getLong();
|
||||
|
|
|
@ -29,9 +29,9 @@ import java.util.Map;
|
|||
|
||||
/**
|
||||
* Processing related interface
|
||||
*
|
||||
*
|
||||
* An AggregatorFactory is an object that knows how to generate an Aggregator using a ColumnSelectorFactory.
|
||||
*
|
||||
*
|
||||
* This is useful as an abstraction to allow Aggregator classes to be written in terms of MetricSelector objects
|
||||
* without making any assumptions about how they are pulling values out of the base data. That is, the data is
|
||||
* provided to the Aggregator through the MetricSelector object, so whatever creates that object gets to choose how
|
||||
|
@ -78,7 +78,10 @@ public abstract class AggregatorFactory
|
|||
*/
|
||||
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
|
||||
{
|
||||
throw new UnsupportedOperationException(String.format("[%s] does not implement getMergingFactory(..)", this.getClass().getName()));
|
||||
throw new UnsupportedOperationException(String.format(
|
||||
"[%s] does not implement getMergingFactory(..)",
|
||||
this.getClass().getName()
|
||||
));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -124,11 +127,13 @@ public abstract class AggregatorFactory
|
|||
public abstract int getMaxIntermediateSize();
|
||||
|
||||
/**
|
||||
* Returns the starting value for a corresponding aggregator. For example, 0 for sums, - Infinity for max, an empty mogrifier
|
||||
*
|
||||
* @return the starting value for a corresponding aggregator.
|
||||
* Deprecated, to be removed in 0.10.0. See https://github.com/druid-io/druid/issues/3588.
|
||||
*/
|
||||
public abstract Object getAggregatorStartValue();
|
||||
@Deprecated
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
throw new UnsupportedOperationException("getAggregatorStartValue is deprecated");
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges the list of AggregatorFactory[] (presumable from metadata of some segments being merged) and
|
||||
|
|
|
@ -56,7 +56,7 @@ public class Aggregators
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return null;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -32,12 +32,10 @@ public class CountAggregator implements Aggregator
|
|||
return ((Number) lhs).longValue() + ((Number) rhs).longValue();
|
||||
}
|
||||
|
||||
long count = 0;
|
||||
private final String name;
|
||||
private long count = 0;
|
||||
|
||||
public CountAggregator(String name)
|
||||
public CountAggregator()
|
||||
{
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -73,13 +71,13 @@ public class CountAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return this.name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator clone()
|
||||
{
|
||||
return new CountAggregator(name);
|
||||
return new CountAggregator();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -50,7 +50,7 @@ public class CountAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new CountAggregator(name);
|
||||
return new CountAggregator();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -126,12 +126,6 @@ public class CountAggregatorFactory extends AggregatorFactory
|
|||
return Longs.BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -35,13 +35,11 @@ public class DoubleMaxAggregator implements Aggregator
|
|||
}
|
||||
|
||||
private final FloatColumnSelector selector;
|
||||
private final String name;
|
||||
|
||||
private double max;
|
||||
|
||||
public DoubleMaxAggregator(String name, FloatColumnSelector selector)
|
||||
public DoubleMaxAggregator(FloatColumnSelector selector)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
|
||||
reset();
|
||||
|
@ -80,13 +78,13 @@ public class DoubleMaxAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return this.name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator clone()
|
||||
{
|
||||
return new DoubleMaxAggregator(name, selector);
|
||||
return new DoubleMaxAggregator(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Doubles;
|
||||
|
||||
import io.druid.common.utils.StringUtils;
|
||||
import io.druid.math.expr.Parser;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
@ -71,7 +70,7 @@ public class DoubleMaxAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new DoubleMaxAggregator(name, getFloatColumnSelector(metricFactory));
|
||||
return new DoubleMaxAggregator(getFloatColumnSelector(metricFactory));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -186,12 +185,6 @@ public class DoubleMaxAggregatorFactory extends AggregatorFactory
|
|||
return Doubles.BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return Double.NEGATIVE_INFINITY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -35,13 +35,11 @@ public class DoubleMinAggregator implements Aggregator
|
|||
}
|
||||
|
||||
private final FloatColumnSelector selector;
|
||||
private final String name;
|
||||
|
||||
private double min;
|
||||
|
||||
public DoubleMinAggregator(String name, FloatColumnSelector selector)
|
||||
public DoubleMinAggregator(FloatColumnSelector selector)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
|
||||
reset();
|
||||
|
@ -80,13 +78,13 @@ public class DoubleMinAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return this.name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator clone()
|
||||
{
|
||||
return new DoubleMinAggregator(name, selector);
|
||||
return new DoubleMinAggregator(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Doubles;
|
||||
|
||||
import io.druid.common.utils.StringUtils;
|
||||
import io.druid.math.expr.Parser;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
@ -71,7 +70,7 @@ public class DoubleMinAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new DoubleMinAggregator(name, getFloatColumnSelector(metricFactory));
|
||||
return new DoubleMinAggregator(getFloatColumnSelector(metricFactory));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -186,12 +185,6 @@ public class DoubleMinAggregatorFactory extends AggregatorFactory
|
|||
return Doubles.BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return Double.POSITIVE_INFINITY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -44,13 +44,11 @@ public class DoubleSumAggregator implements Aggregator
|
|||
}
|
||||
|
||||
private final FloatColumnSelector selector;
|
||||
private final String name;
|
||||
|
||||
private double sum;
|
||||
|
||||
public DoubleSumAggregator(String name, FloatColumnSelector selector)
|
||||
public DoubleSumAggregator(FloatColumnSelector selector)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
|
||||
this.sum = 0;
|
||||
|
@ -89,13 +87,13 @@ public class DoubleSumAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return this.name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator clone()
|
||||
{
|
||||
return new DoubleSumAggregator(name, selector);
|
||||
return new DoubleSumAggregator(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -70,7 +70,7 @@ public class DoubleSumAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new DoubleSumAggregator(name, getFloatColumnSelector(metricFactory));
|
||||
return new DoubleSumAggregator(getFloatColumnSelector(metricFactory));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -185,12 +185,6 @@ public class DoubleSumAggregatorFactory extends AggregatorFactory
|
|||
return Doubles.BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -67,7 +67,7 @@ public class FilteredAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return delegate.getName();
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -152,12 +152,6 @@ public class FilteredAggregatorFactory extends AggregatorFactory
|
|||
return delegate.getMaxIntermediateSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return delegate.getAggregatorStartValue();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public AggregatorFactory getAggregator()
|
||||
{
|
||||
|
|
|
@ -35,17 +35,17 @@ public class HistogramAggregator implements Aggregator
|
|||
}
|
||||
};
|
||||
|
||||
static Object combineHistograms(Object lhs, Object rhs) {
|
||||
static Object combineHistograms(Object lhs, Object rhs)
|
||||
{
|
||||
return ((Histogram) lhs).fold((Histogram) rhs);
|
||||
}
|
||||
|
||||
private final FloatColumnSelector selector;
|
||||
private final String name;
|
||||
|
||||
private Histogram histogram;
|
||||
|
||||
public HistogramAggregator(String name, FloatColumnSelector selector, float[] breaks) {
|
||||
this.name = name;
|
||||
public HistogramAggregator(FloatColumnSelector selector, float[] breaks)
|
||||
{
|
||||
this.selector = selector;
|
||||
this.histogram = new Histogram(breaks);
|
||||
}
|
||||
|
@ -83,7 +83,7 @@ public class HistogramAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -66,11 +66,7 @@ public class HistogramAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new HistogramAggregator(
|
||||
name,
|
||||
metricFactory.makeFloatColumnSelector(fieldName),
|
||||
breaks
|
||||
);
|
||||
return new HistogramAggregator(metricFactory.makeFloatColumnSelector(fieldName), breaks);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -159,7 +155,7 @@ public class HistogramAggregatorFactory extends AggregatorFactory
|
|||
.allocate(1 + fieldNameBytes.length + Floats.BYTES * breaks.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
.put(fieldNameBytes)
|
||||
.put((byte)0xFF);
|
||||
.put((byte) 0xFF);
|
||||
buf.asFloatBuffer().put(breaks);
|
||||
|
||||
return buf.array();
|
||||
|
@ -177,12 +173,6 @@ public class HistogramAggregatorFactory extends AggregatorFactory
|
|||
return Longs.BYTES * (breaks.length + 1) + Floats.BYTES * 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return new Histogram(breaks);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -37,15 +37,13 @@ public class JavaScriptAggregator implements Aggregator
|
|||
public void close();
|
||||
}
|
||||
|
||||
private final String name;
|
||||
private final ObjectColumnSelector[] selectorList;
|
||||
private final ScriptAggregator script;
|
||||
|
||||
private volatile double current;
|
||||
|
||||
public JavaScriptAggregator(String name, List<ObjectColumnSelector> selectorList, ScriptAggregator script)
|
||||
public JavaScriptAggregator(List<ObjectColumnSelector> selectorList, ScriptAggregator script)
|
||||
{
|
||||
this.name = name;
|
||||
this.selectorList = Lists.newArrayList(selectorList).toArray(new ObjectColumnSelector[]{});
|
||||
this.script = script;
|
||||
|
||||
|
@ -85,7 +83,7 @@ public class JavaScriptAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -96,7 +96,6 @@ public class JavaScriptAggregatorFactory extends AggregatorFactory
|
|||
public Aggregator factorize(final ColumnSelectorFactory columnFactory)
|
||||
{
|
||||
return new JavaScriptAggregator(
|
||||
name,
|
||||
Lists.transform(
|
||||
fieldNames,
|
||||
new com.google.common.base.Function<String, ObjectColumnSelector>()
|
||||
|
@ -263,12 +262,6 @@ public class JavaScriptAggregatorFactory extends AggregatorFactory
|
|||
return Doubles.BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return getCompiledScript().reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -35,13 +35,11 @@ public class LongMaxAggregator implements Aggregator
|
|||
}
|
||||
|
||||
private final LongColumnSelector selector;
|
||||
private final String name;
|
||||
|
||||
private long max;
|
||||
|
||||
public LongMaxAggregator(String name, LongColumnSelector selector)
|
||||
public LongMaxAggregator(LongColumnSelector selector)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
|
||||
reset();
|
||||
|
@ -80,13 +78,13 @@ public class LongMaxAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return this.name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator clone()
|
||||
{
|
||||
return new LongMaxAggregator(name, selector);
|
||||
return new LongMaxAggregator(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -54,7 +54,8 @@ public class LongMaxAggregatorFactory extends AggregatorFactory
|
|||
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
|
||||
Preconditions.checkArgument(
|
||||
fieldName == null ^ expression == null,
|
||||
"Must have a valid, non-null fieldName or expression");
|
||||
"Must have a valid, non-null fieldName or expression"
|
||||
);
|
||||
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
|
@ -69,7 +70,7 @@ public class LongMaxAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new LongMaxAggregator(name, getLongColumnSelector(metricFactory));
|
||||
return new LongMaxAggregator(getLongColumnSelector(metricFactory));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -180,12 +181,6 @@ public class LongMaxAggregatorFactory extends AggregatorFactory
|
|||
return Longs.BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return Long.MIN_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -35,13 +35,11 @@ public class LongMinAggregator implements Aggregator
|
|||
}
|
||||
|
||||
private final LongColumnSelector selector;
|
||||
private final String name;
|
||||
|
||||
private long min;
|
||||
|
||||
public LongMinAggregator(String name, LongColumnSelector selector)
|
||||
public LongMinAggregator(LongColumnSelector selector)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
|
||||
reset();
|
||||
|
@ -80,13 +78,13 @@ public class LongMinAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return this.name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator clone()
|
||||
{
|
||||
return new LongMinAggregator(name, selector);
|
||||
return new LongMinAggregator(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -54,7 +54,8 @@ public class LongMinAggregatorFactory extends AggregatorFactory
|
|||
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
|
||||
Preconditions.checkArgument(
|
||||
fieldName == null ^ expression == null,
|
||||
"Must have a valid, non-null fieldName or expression");
|
||||
"Must have a valid, non-null fieldName or expression"
|
||||
);
|
||||
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
|
@ -69,7 +70,7 @@ public class LongMinAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new LongMinAggregator(name, getLongColumnSelector(metricFactory));
|
||||
return new LongMinAggregator(getLongColumnSelector(metricFactory));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -180,12 +181,6 @@ public class LongMinAggregatorFactory extends AggregatorFactory
|
|||
return Longs.BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -37,18 +37,17 @@ public class LongSumAggregator implements Aggregator
|
|||
}
|
||||
};
|
||||
|
||||
static long combineValues(Object lhs, Object rhs) {
|
||||
static long combineValues(Object lhs, Object rhs)
|
||||
{
|
||||
return ((Number) lhs).longValue() + ((Number) rhs).longValue();
|
||||
}
|
||||
|
||||
private final LongColumnSelector selector;
|
||||
private final String name;
|
||||
|
||||
private long sum;
|
||||
|
||||
public LongSumAggregator(String name, LongColumnSelector selector)
|
||||
public LongSumAggregator(LongColumnSelector selector)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
|
||||
this.sum = 0;
|
||||
|
@ -87,13 +86,13 @@ public class LongSumAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator clone()
|
||||
{
|
||||
return new LongSumAggregator(name, selector);
|
||||
return new LongSumAggregator(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -54,7 +54,8 @@ public class LongSumAggregatorFactory extends AggregatorFactory
|
|||
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
|
||||
Preconditions.checkArgument(
|
||||
fieldName == null ^ expression == null,
|
||||
"Must have a valid, non-null fieldName or expression");
|
||||
"Must have a valid, non-null fieldName or expression"
|
||||
);
|
||||
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
|
@ -69,7 +70,7 @@ public class LongSumAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new LongSumAggregator(name, getLongColumnSelector(metricFactory));
|
||||
return new LongSumAggregator(getLongColumnSelector(metricFactory));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -180,12 +181,6 @@ public class LongSumAggregatorFactory extends AggregatorFactory
|
|||
return Longs.BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -34,7 +34,6 @@ public class CardinalityAggregator implements Aggregator
|
|||
{
|
||||
private static final String NULL_STRING = "\u0000";
|
||||
|
||||
private final String name;
|
||||
private final List<DimensionSelector> selectorList;
|
||||
private final boolean byRow;
|
||||
|
||||
|
@ -87,12 +86,10 @@ public class CardinalityAggregator implements Aggregator
|
|||
private HyperLogLogCollector collector;
|
||||
|
||||
public CardinalityAggregator(
|
||||
String name,
|
||||
List<DimensionSelector> selectorList,
|
||||
boolean byRow
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.selectorList = selectorList;
|
||||
this.collector = HyperLogLogCollector.makeLatestCollector();
|
||||
this.byRow = byRow;
|
||||
|
@ -135,13 +132,13 @@ public class CardinalityAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator clone()
|
||||
{
|
||||
return new CardinalityAggregator(name, selectorList, byRow);
|
||||
return new CardinalityAggregator(selectorList, byRow);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -139,7 +139,7 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
return Aggregators.noopAggregator();
|
||||
}
|
||||
|
||||
return new CardinalityAggregator(name, selectors, byRow);
|
||||
return new CardinalityAggregator(selectors, byRow);
|
||||
}
|
||||
|
||||
|
||||
|
@ -310,12 +310,6 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
return HyperLogLogCollector.getLatestNumBytesForDenseStorage();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return HyperLogLogCollector.makeLatestCollector();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
|
|
@ -26,17 +26,14 @@ import io.druid.segment.ObjectColumnSelector;
|
|||
*/
|
||||
public class HyperUniquesAggregator implements Aggregator
|
||||
{
|
||||
private final String name;
|
||||
private final ObjectColumnSelector selector;
|
||||
|
||||
private HyperLogLogCollector collector;
|
||||
|
||||
public HyperUniquesAggregator(
|
||||
String name,
|
||||
ObjectColumnSelector selector
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
|
||||
this.collector = HyperLogLogCollector.makeLatestCollector();
|
||||
|
@ -76,13 +73,13 @@ public class HyperUniquesAggregator implements Aggregator
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
throw new UnsupportedOperationException("getName is deprecated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator clone()
|
||||
{
|
||||
return new HyperUniquesAggregator(name, selector);
|
||||
return new HyperUniquesAggregator(selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -77,7 +77,7 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
|
||||
final Class classOfObject = selector.classOfObject();
|
||||
if (classOfObject.equals(Object.class) || HyperLogLogCollector.class.isAssignableFrom(classOfObject)) {
|
||||
return new HyperUniquesAggregator(name, selector);
|
||||
return new HyperUniquesAggregator(selector);
|
||||
}
|
||||
|
||||
throw new IAE(
|
||||
|
@ -209,12 +209,6 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
return HyperLogLogCollector.getLatestNumBytesForDenseStorage();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return HyperLogLogCollector.makeLatestCollector();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -61,7 +61,13 @@ public class TimeseriesQueryEngine
|
|||
@Override
|
||||
public Result<TimeseriesResultValue> apply(Cursor cursor)
|
||||
{
|
||||
Aggregator[] aggregators = QueryRunnerHelper.makeAggregators(cursor, aggregatorSpecs);
|
||||
Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()];
|
||||
String[] aggregatorNames = new String[aggregatorSpecs.size()];
|
||||
|
||||
for (int i = 0; i < aggregatorSpecs.size(); i++) {
|
||||
aggregators[i] = aggregatorSpecs.get(i).factorize(cursor);
|
||||
aggregatorNames[i] = aggregatorSpecs.get(i).getName();
|
||||
}
|
||||
|
||||
if (skipEmptyBuckets && cursor.isDone()) {
|
||||
return null;
|
||||
|
@ -77,8 +83,8 @@ public class TimeseriesQueryEngine
|
|||
|
||||
TimeseriesResultBuilder bob = new TimeseriesResultBuilder(cursor.getTime());
|
||||
|
||||
for (Aggregator aggregator : aggregators) {
|
||||
bob.addMetric(aggregator);
|
||||
for (int i = 0; i < aggregatorSpecs.size(); i++) {
|
||||
bob.addMetric(aggregatorNames[i], aggregators[i]);
|
||||
}
|
||||
|
||||
Result<TimeseriesResultValue> retVal = bob.build();
|
||||
|
|
|
@ -42,15 +42,9 @@ public class TimeseriesResultBuilder
|
|||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
public TimeseriesResultBuilder addMetric(Aggregator aggregator)
|
||||
public TimeseriesResultBuilder addMetric(String name, Aggregator aggregator)
|
||||
{
|
||||
metricValues.put(aggregator.getName(), aggregator.get());
|
||||
return this;
|
||||
}
|
||||
|
||||
public TimeseriesResultBuilder addMetric(PostAggregator postAggregator)
|
||||
{
|
||||
metricValues.put(postAggregator.getName(), postAggregator.compute(metricValues));
|
||||
metricValues.put(name, aggregator.get());
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -179,11 +179,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
|
||||
if (null != priorIndex) {
|
||||
aggs = concurrentGet(priorIndex);
|
||||
doAggregate(aggs, rowContainer, row, reportParseExceptions);
|
||||
doAggregate(metrics, aggs, rowContainer, row, reportParseExceptions);
|
||||
} else {
|
||||
aggs = new Aggregator[metrics.length];
|
||||
factorizeAggs(metrics, aggs, rowContainer, row);
|
||||
doAggregate(aggs, rowContainer, row, reportParseExceptions);
|
||||
doAggregate(metrics, aggs, rowContainer, row, reportParseExceptions);
|
||||
|
||||
final Integer rowIndex = indexIncrement.getAndIncrement();
|
||||
concurrentSet(rowIndex, aggs);
|
||||
|
@ -198,7 +198,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
} else {
|
||||
// We lost a race
|
||||
aggs = concurrentGet(prev);
|
||||
doAggregate(aggs, rowContainer, row, reportParseExceptions);
|
||||
doAggregate(metrics, aggs, rowContainer, row, reportParseExceptions);
|
||||
// Free up the misfire
|
||||
concurrentRemove(rowIndex);
|
||||
// This is expected to occur ~80% of the time in the worst scenarios
|
||||
|
@ -224,6 +224,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
}
|
||||
|
||||
private void doAggregate(
|
||||
AggregatorFactory[] metrics,
|
||||
Aggregator[] aggs,
|
||||
ThreadLocal<InputRow> rowContainer,
|
||||
InputRow row,
|
||||
|
@ -232,7 +233,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
{
|
||||
rowContainer.set(row);
|
||||
|
||||
for (Aggregator agg : aggs) {
|
||||
for (int i = 0 ; i < aggs.length ; i++) {
|
||||
final Aggregator agg = aggs[i];
|
||||
synchronized (agg) {
|
||||
try {
|
||||
agg.aggregate();
|
||||
|
@ -240,9 +242,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
catch (ParseException e) {
|
||||
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
|
||||
if (reportParseExceptions) {
|
||||
throw new ParseException(e, "Encountered parse error for aggregator[%s]", agg.getName());
|
||||
throw new ParseException(e, "Encountered parse error for aggregator[%s]", metrics[i].getName());
|
||||
} else {
|
||||
log.debug(e, "Encountered parse error, skipping aggregator[%s].", agg.getName());
|
||||
log.debug(e, "Encountered parse error, skipping aggregator[%s].", metrics[i].getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,9 +31,7 @@ public class CountAggregatorTest
|
|||
@Test
|
||||
public void testAggregate()
|
||||
{
|
||||
CountAggregator agg = new CountAggregator("billy");
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
CountAggregator agg = new CountAggregator();
|
||||
|
||||
Assert.assertEquals(0L, agg.get());
|
||||
Assert.assertEquals(0L, agg.get());
|
||||
|
@ -51,7 +49,7 @@ public class CountAggregatorTest
|
|||
@Test
|
||||
public void testComparator()
|
||||
{
|
||||
CountAggregator agg = new CountAggregator("billy");
|
||||
CountAggregator agg = new CountAggregator();
|
||||
|
||||
Object first = agg.get();
|
||||
agg.aggregate();
|
||||
|
|
|
@ -59,8 +59,6 @@ public class DoubleMaxAggregationTest
|
|||
{
|
||||
DoubleMaxAggregator agg = (DoubleMaxAggregator) doubleMaxAggFactory.factorize(colSelectorFactory);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
||||
aggregate(selector, agg);
|
||||
aggregate(selector, agg);
|
||||
aggregate(selector, agg);
|
||||
|
|
|
@ -59,8 +59,6 @@ public class DoubleMinAggregationTest
|
|||
{
|
||||
DoubleMinAggregator agg = (DoubleMinAggregator) doubleMinAggFactory.factorize(colSelectorFactory);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
||||
aggregate(selector, agg);
|
||||
aggregate(selector, agg);
|
||||
aggregate(selector, agg);
|
||||
|
|
|
@ -39,9 +39,7 @@ public class DoubleSumAggregatorTest
|
|||
{
|
||||
final float[] values = {0.15f, 0.27f};
|
||||
final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
|
||||
DoubleSumAggregator agg = new DoubleSumAggregator("billy", selector);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
DoubleSumAggregator agg = new DoubleSumAggregator(selector);
|
||||
|
||||
double expectedFirst = new Float(values[0]).doubleValue();
|
||||
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
|
||||
|
@ -63,9 +61,7 @@ public class DoubleSumAggregatorTest
|
|||
public void testComparator()
|
||||
{
|
||||
final TestFloatColumnSelector selector = new TestFloatColumnSelector(new float[]{0.15f, 0.27f});
|
||||
DoubleSumAggregator agg = new DoubleSumAggregator("billy", selector);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
DoubleSumAggregator agg = new DoubleSumAggregator(selector);
|
||||
|
||||
Object first = agg.get();
|
||||
agg.aggregate();
|
||||
|
|
|
@ -75,8 +75,6 @@ public class FilteredAggregatorTest
|
|||
makeColumnSelector(selector)
|
||||
);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
||||
double expectedFirst = new Float(values[0]).doubleValue();
|
||||
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
|
||||
double expectedThird = expectedSecond;
|
||||
|
@ -235,8 +233,6 @@ public class FilteredAggregatorTest
|
|||
makeColumnSelector(selector)
|
||||
);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
||||
double expectedFirst = new Float(values[0]).doubleValue();
|
||||
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
|
||||
double expectedThird = expectedSecond + new Float(values[2]).doubleValue();
|
||||
|
@ -359,8 +355,6 @@ public class FilteredAggregatorTest
|
|||
makeColumnSelector(selector)
|
||||
);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
||||
double expectedFirst = new Float(values[0]).doubleValue();
|
||||
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
|
||||
double expectedThird = expectedSecond;
|
||||
|
|
|
@ -59,9 +59,7 @@ public class HistogramAggregatorTest
|
|||
|
||||
final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
|
||||
|
||||
HistogramAggregator agg = new HistogramAggregator("billy", selector, breaks);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
HistogramAggregator agg = new HistogramAggregator(selector, breaks);
|
||||
|
||||
Assert.assertArrayEquals(new long[]{0,0,0,0,0,0}, ((Histogram)agg.get()).bins);
|
||||
Assert.assertArrayEquals(new long[]{0,0,0,0,0,0}, ((Histogram)agg.get()).bins);
|
||||
|
|
|
@ -53,7 +53,6 @@ public class JavaScriptAggregatorBenchmark extends SimpleBenchmark
|
|||
Map<String, String> script = scriptDoubleSum;
|
||||
|
||||
jsAggregator = new JavaScriptAggregator(
|
||||
"billy",
|
||||
Lists.asList(MetricSelectorUtils.wrap(selector), new ObjectColumnSelector[]{}),
|
||||
JavaScriptAggregatorFactory.compileScript(
|
||||
script.get("fnAggregate"),
|
||||
|
@ -62,7 +61,7 @@ public class JavaScriptAggregatorBenchmark extends SimpleBenchmark
|
|||
)
|
||||
);
|
||||
|
||||
doubleAgg = new DoubleSumAggregator("billy", selector);
|
||||
doubleAgg = new DoubleSumAggregator(selector);
|
||||
}
|
||||
|
||||
public double timeJavaScriptDoubleSum(int reps)
|
||||
|
|
|
@ -138,7 +138,6 @@ public class JavaScriptAggregatorTest
|
|||
Map<String, String> script = sumLogATimesBPlusTen;
|
||||
|
||||
JavaScriptAggregator agg = new JavaScriptAggregator(
|
||||
"billy",
|
||||
Arrays.<ObjectColumnSelector>asList(MetricSelectorUtils.wrap(selector1), MetricSelectorUtils.wrap(selector2)),
|
||||
JavaScriptAggregatorFactory.compileScript(
|
||||
script.get("fnAggregate"),
|
||||
|
@ -149,8 +148,6 @@ public class JavaScriptAggregatorTest
|
|||
|
||||
agg.reset();
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
||||
double val = 10.;
|
||||
Assert.assertEquals(val, agg.get());
|
||||
Assert.assertEquals(val, agg.get());
|
||||
|
@ -213,7 +210,6 @@ public class JavaScriptAggregatorTest
|
|||
Map<String, String> script = scriptDoubleSum;
|
||||
|
||||
JavaScriptAggregator agg = new JavaScriptAggregator(
|
||||
"billy",
|
||||
Collections.<ObjectColumnSelector>singletonList(null),
|
||||
JavaScriptAggregatorFactory.compileScript(
|
||||
script.get("fnAggregate"),
|
||||
|
@ -224,8 +220,6 @@ public class JavaScriptAggregatorTest
|
|||
|
||||
final double val = 0;
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
||||
agg.reset();
|
||||
Assert.assertEquals(val, agg.get());
|
||||
Assert.assertEquals(val, agg.get());
|
||||
|
@ -247,7 +241,6 @@ public class JavaScriptAggregatorTest
|
|||
{
|
||||
final TestObjectColumnSelector ocs = new TestObjectColumnSelector("what", null, new String[]{"hey", "there"});
|
||||
final JavaScriptAggregator agg = new JavaScriptAggregator(
|
||||
"billy",
|
||||
Collections.<ObjectColumnSelector>singletonList(ocs),
|
||||
JavaScriptAggregatorFactory.compileScript(
|
||||
"function aggregate(current, a) { if (Array.isArray(a)) { return current + a.length; } else if (typeof a === 'string') { return current + 1; } else { return current; } }",
|
||||
|
@ -258,8 +251,6 @@ public class JavaScriptAggregatorTest
|
|||
|
||||
agg.reset();
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
||||
double val = 0.;
|
||||
Assert.assertEquals(val, agg.get());
|
||||
Assert.assertEquals(val, agg.get());
|
||||
|
@ -343,7 +334,6 @@ public class JavaScriptAggregatorTest
|
|||
|
||||
Map<String, String> script = scriptDoubleSum;
|
||||
JavaScriptAggregator aggRhino = new JavaScriptAggregator(
|
||||
"billy",
|
||||
Lists.asList(MetricSelectorUtils.wrap(selector), new ObjectColumnSelector[]{}),
|
||||
JavaScriptAggregatorFactory.compileScript(
|
||||
script.get("fnAggregate"),
|
||||
|
@ -352,7 +342,7 @@ public class JavaScriptAggregatorTest
|
|||
)
|
||||
);
|
||||
|
||||
DoubleSumAggregator doubleAgg = new DoubleSumAggregator("billy", selector);
|
||||
DoubleSumAggregator doubleAgg = new DoubleSumAggregator(selector);
|
||||
|
||||
// warmup
|
||||
int i = 0;
|
||||
|
|
|
@ -59,8 +59,6 @@ public class LongMaxAggregationTest
|
|||
{
|
||||
LongMaxAggregator agg = (LongMaxAggregator)longMaxAggFactory.factorize(colSelectorFactory);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
||||
aggregate(selector, agg);
|
||||
aggregate(selector, agg);
|
||||
aggregate(selector, agg);
|
||||
|
|
|
@ -59,8 +59,6 @@ public class LongMinAggregationTest
|
|||
{
|
||||
LongMinAggregator agg = (LongMinAggregator)longMinAggFactory.factorize(colSelectorFactory);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
||||
aggregate(selector, agg);
|
||||
aggregate(selector, agg);
|
||||
aggregate(selector, agg);
|
||||
|
|
|
@ -38,9 +38,7 @@ public class LongSumAggregatorTest
|
|||
public void testAggregate()
|
||||
{
|
||||
final TestLongColumnSelector selector = new TestLongColumnSelector(new long[]{24L, 20L});
|
||||
LongSumAggregator agg = new LongSumAggregator("billy", selector);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
LongSumAggregator agg = new LongSumAggregator(selector);
|
||||
|
||||
Assert.assertEquals(0L, agg.get());
|
||||
Assert.assertEquals(0L, agg.get());
|
||||
|
@ -59,9 +57,7 @@ public class LongSumAggregatorTest
|
|||
public void testComparator()
|
||||
{
|
||||
final TestLongColumnSelector selector = new TestLongColumnSelector(new long[]{18293L});
|
||||
LongSumAggregator agg = new LongSumAggregator("billy", selector);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
LongSumAggregator agg = new LongSumAggregator(selector);
|
||||
|
||||
Object first = agg.get();
|
||||
agg.aggregate();
|
||||
|
|
|
@ -42,14 +42,14 @@ public class MetricManipulatorFnsTest
|
|||
final ArrayList<Object[]> constructorArrays = new ArrayList<>();
|
||||
final long longVal = 13789;
|
||||
LongMinAggregator longMinAggregator = new LongMinAggregator(
|
||||
NAME, new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return longVal;
|
||||
}
|
||||
}
|
||||
new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return longVal;
|
||||
}
|
||||
}
|
||||
);
|
||||
LongMinAggregatorFactory longMinAggregatorFactory = new LongMinAggregatorFactory(NAME, FIELD);
|
||||
constructorArrays.add(
|
||||
|
@ -81,14 +81,14 @@ public class MetricManipulatorFnsTest
|
|||
|
||||
LongSumAggregatorFactory longSumAggregatorFactory = new LongSumAggregatorFactory(NAME, FIELD);
|
||||
LongSumAggregator longSumAggregator = new LongSumAggregator(
|
||||
NAME, new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return longVal;
|
||||
}
|
||||
}
|
||||
new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return longVal;
|
||||
}
|
||||
}
|
||||
);
|
||||
constructorArrays.add(
|
||||
new Object[]{
|
||||
|
|
|
@ -310,7 +310,6 @@ public class CardinalityAggregatorTest
|
|||
public void testAggregateRows() throws Exception
|
||||
{
|
||||
CardinalityAggregator agg = new CardinalityAggregator(
|
||||
"billy",
|
||||
selectorList,
|
||||
true
|
||||
);
|
||||
|
@ -326,7 +325,6 @@ public class CardinalityAggregatorTest
|
|||
public void testAggregateValues() throws Exception
|
||||
{
|
||||
CardinalityAggregator agg = new CardinalityAggregator(
|
||||
"billy",
|
||||
selectorList,
|
||||
false
|
||||
);
|
||||
|
@ -385,8 +383,8 @@ public class CardinalityAggregatorTest
|
|||
List<DimensionSelector> selector1 = Lists.newArrayList((DimensionSelector) dim1);
|
||||
List<DimensionSelector> selector2 = Lists.newArrayList((DimensionSelector) dim2);
|
||||
|
||||
CardinalityAggregator agg1 = new CardinalityAggregator("billy", selector1, true);
|
||||
CardinalityAggregator agg2 = new CardinalityAggregator("billy", selector2, true);
|
||||
CardinalityAggregator agg1 = new CardinalityAggregator(selector1, true);
|
||||
CardinalityAggregator agg2 = new CardinalityAggregator(selector2, true);
|
||||
|
||||
for (int i = 0; i < values1.size(); ++i) {
|
||||
aggregate(selector1, agg1);
|
||||
|
@ -416,8 +414,8 @@ public class CardinalityAggregatorTest
|
|||
List<DimensionSelector> selector1 = Lists.newArrayList((DimensionSelector) dim1);
|
||||
List<DimensionSelector> selector2 = Lists.newArrayList((DimensionSelector) dim2);
|
||||
|
||||
CardinalityAggregator agg1 = new CardinalityAggregator("billy", selector1, false);
|
||||
CardinalityAggregator agg2 = new CardinalityAggregator("billy", selector2, false);
|
||||
CardinalityAggregator agg1 = new CardinalityAggregator(selector1, false);
|
||||
CardinalityAggregator agg2 = new CardinalityAggregator(selector2, false);
|
||||
|
||||
for (int i = 0; i < values1.size(); ++i) {
|
||||
aggregate(selector1, agg1);
|
||||
|
@ -445,7 +443,6 @@ public class CardinalityAggregatorTest
|
|||
public void testAggregateRowsWithExtraction() throws Exception
|
||||
{
|
||||
CardinalityAggregator agg = new CardinalityAggregator(
|
||||
"billy",
|
||||
selectorListWithExtraction,
|
||||
true
|
||||
);
|
||||
|
@ -455,7 +452,6 @@ public class CardinalityAggregatorTest
|
|||
Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get()), 0.05);
|
||||
|
||||
CardinalityAggregator agg2 = new CardinalityAggregator(
|
||||
"billy",
|
||||
selectorListConstantVal,
|
||||
true
|
||||
);
|
||||
|
@ -469,7 +465,6 @@ public class CardinalityAggregatorTest
|
|||
public void testAggregateValuesWithExtraction() throws Exception
|
||||
{
|
||||
CardinalityAggregator agg = new CardinalityAggregator(
|
||||
"billy",
|
||||
selectorListWithExtraction,
|
||||
false
|
||||
);
|
||||
|
@ -479,7 +474,6 @@ public class CardinalityAggregatorTest
|
|||
Assert.assertEquals(7.0, (Double) valueAggregatorFactory.finalizeComputation(agg.get()), 0.05);
|
||||
|
||||
CardinalityAggregator agg2 = new CardinalityAggregator(
|
||||
"billy",
|
||||
selectorListConstantVal,
|
||||
false
|
||||
);
|
||||
|
|
|
@ -36,13 +36,4 @@ public class HyperUniquesAggregatorFactoryTest
|
|||
Object v0 = aggregatorFactory.deserialize(V0_BASE64);
|
||||
Assert.assertEquals("deserialized value is HLLCV0", HLLCV0.class, v0.getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCombineStartValueV0() throws Exception
|
||||
{
|
||||
Object combined = aggregatorFactory.getAggregatorStartValue();
|
||||
aggregatorFactory.combine(combined, aggregatorFactory.deserialize(V0_BASE64));
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -39,14 +39,15 @@ public class ArithmeticPostAggregatorTest
|
|||
@Test
|
||||
public void testCompute()
|
||||
{
|
||||
final String aggName = "rows";
|
||||
ArithmeticPostAggregator arithmeticPostAggregator;
|
||||
ExpressionPostAggregator expressionPostAggregator;
|
||||
CountAggregator agg = new CountAggregator("rows");
|
||||
CountAggregator agg = new CountAggregator();
|
||||
agg.aggregate();
|
||||
agg.aggregate();
|
||||
agg.aggregate();
|
||||
Map<String, Object> metricValues = new HashMap<String, Object>();
|
||||
metricValues.put(agg.getName(), agg.get());
|
||||
metricValues.put(aggName, agg.get());
|
||||
|
||||
List<PostAggregator> postAggregatorList =
|
||||
Lists.newArrayList(
|
||||
|
@ -86,10 +87,11 @@ public class ArithmeticPostAggregatorTest
|
|||
@Test
|
||||
public void testComparator()
|
||||
{
|
||||
final String aggName = "rows";
|
||||
ArithmeticPostAggregator arithmeticPostAggregator;
|
||||
CountAggregator agg = new CountAggregator("rows");
|
||||
CountAggregator agg = new CountAggregator();
|
||||
Map<String, Object> metricValues = new HashMap<String, Object>();
|
||||
metricValues.put(agg.getName(), agg.get());
|
||||
metricValues.put(aggName, agg.get());
|
||||
|
||||
List<PostAggregator> postAggregatorList =
|
||||
Lists.newArrayList(
|
||||
|
@ -107,7 +109,7 @@ public class ArithmeticPostAggregatorTest
|
|||
agg.aggregate();
|
||||
agg.aggregate();
|
||||
agg.aggregate();
|
||||
metricValues.put(agg.getName(), agg.get());
|
||||
metricValues.put(aggName, agg.get());
|
||||
Object after = arithmeticPostAggregator.compute(metricValues);
|
||||
|
||||
Assert.assertEquals(-1, comp.compare(before, after));
|
||||
|
|
|
@ -33,18 +33,19 @@ public class FieldAccessPostAggregatorTest
|
|||
@Test
|
||||
public void testCompute()
|
||||
{
|
||||
final String aggName = "rows";
|
||||
FieldAccessPostAggregator fieldAccessPostAggregator;
|
||||
|
||||
fieldAccessPostAggregator = new FieldAccessPostAggregator("To be, or not to be, that is the question:", "rows");
|
||||
CountAggregator agg = new CountAggregator("rows");
|
||||
CountAggregator agg = new CountAggregator();
|
||||
Map<String, Object> metricValues = new HashMap<String, Object>();
|
||||
metricValues.put(agg.getName(), agg.get());
|
||||
metricValues.put(aggName, agg.get());
|
||||
Assert.assertEquals(new Long(0L), fieldAccessPostAggregator.compute(metricValues));
|
||||
|
||||
agg.aggregate();
|
||||
agg.aggregate();
|
||||
agg.aggregate();
|
||||
metricValues.put(agg.getName(), agg.get());
|
||||
metricValues.put(aggName, agg.get());
|
||||
Assert.assertEquals(new Long(3L), fieldAccessPostAggregator.compute(metricValues));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -142,9 +142,9 @@ public class SpecificSegmentQueryRunnerTest
|
|||
TimeseriesResultBuilder builder = new TimeseriesResultBuilder(
|
||||
new DateTime("2012-01-01T00:00:00Z")
|
||||
);
|
||||
CountAggregator rows = new CountAggregator("rows");
|
||||
CountAggregator rows = new CountAggregator();
|
||||
rows.aggregate();
|
||||
builder.addMetric(rows);
|
||||
builder.addMetric("rows", rows);
|
||||
final Result<TimeseriesResultValue> value = builder.build();
|
||||
|
||||
final SpecificSegmentQueryRunner queryRunner = new SpecificSegmentQueryRunner(
|
||||
|
|
Loading…
Reference in New Issue