Aggregations: x-axis units normalisation for derivative aggregation
This commit is contained in:
parent
67ed182347
commit
644fd00714
|
@ -194,3 +194,85 @@ And the following may be the response:
|
||||||
<1> No second derivative for the first two buckets since we need at least 2 data points from the first derivative to calculate the
|
<1> No second derivative for the first two buckets since we need at least 2 data points from the first derivative to calculate the
|
||||||
second derivative
|
second derivative
|
||||||
|
|
||||||
|
==== Units
|
||||||
|
|
||||||
|
The derivative aggregation allows the units of the derivative values to be specified. This returns an extra field in the response
|
||||||
|
`normalized_value` which reports the derivative value in the desired x-axis units. In the below example we calculate the derivative
|
||||||
|
of the total sales per month but ask for the derivative of the sales as in the units of sales per day:
|
||||||
|
|
||||||
|
[source,js]
|
||||||
|
--------------------------------------------------
|
||||||
|
{
|
||||||
|
"aggs" : {
|
||||||
|
"sales_per_month" : {
|
||||||
|
"date_histogram" : {
|
||||||
|
"field" : "date",
|
||||||
|
"interval" : "month"
|
||||||
|
},
|
||||||
|
"aggs": {
|
||||||
|
"sales": {
|
||||||
|
"sum": {
|
||||||
|
"field": "price"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"sales_deriv": {
|
||||||
|
"derivative": {
|
||||||
|
"buckets_paths": "sales",
|
||||||
|
"unit": "day" <1>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
--------------------------------------------------
|
||||||
|
|
||||||
|
<1> `unit` specifies what unit to use for the x-axis of the derivative calculation
|
||||||
|
|
||||||
|
And the following may be the response:
|
||||||
|
|
||||||
|
[source,js]
|
||||||
|
--------------------------------------------------
|
||||||
|
{
|
||||||
|
"aggregations": {
|
||||||
|
"sales_per_month": {
|
||||||
|
"buckets": [
|
||||||
|
{
|
||||||
|
"key_as_string": "2015/01/01 00:00:00",
|
||||||
|
"key": 1420070400000,
|
||||||
|
"doc_count": 3,
|
||||||
|
"sales": {
|
||||||
|
"value": 550
|
||||||
|
} <1>
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key_as_string": "2015/02/01 00:00:00",
|
||||||
|
"key": 1422748800000,
|
||||||
|
"doc_count": 2,
|
||||||
|
"sales": {
|
||||||
|
"value": 60
|
||||||
|
},
|
||||||
|
"sales_deriv": {
|
||||||
|
"value": -490, <1>
|
||||||
|
"normalized_value": -17.5 <2>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key_as_string": "2015/03/01 00:00:00",
|
||||||
|
"key": 1425168000000,
|
||||||
|
"doc_count": 2,
|
||||||
|
"sales": {
|
||||||
|
"value": 375
|
||||||
|
},
|
||||||
|
"sales_deriv": {
|
||||||
|
"value": 315,
|
||||||
|
"normalized_value": 10.16129032258065
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
--------------------------------------------------
|
||||||
|
<1> `value` is reported in the original units of 'per month'
|
||||||
|
<2> `normalized_value` is reported in the desired units of 'per day'
|
||||||
|
|
|
@ -64,6 +64,7 @@ import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBuck
|
||||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.max.MaxBucketReducer;
|
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.max.MaxBucketReducer;
|
||||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.min.MinBucketReducer;
|
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.min.MinBucketReducer;
|
||||||
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer;
|
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer;
|
||||||
|
import org.elasticsearch.search.aggregations.reducers.derivative.InternalDerivative;
|
||||||
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgReducer;
|
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgReducer;
|
||||||
import org.elasticsearch.search.aggregations.reducers.movavg.models.TransportMovAvgModelModule;
|
import org.elasticsearch.search.aggregations.reducers.movavg.models.TransportMovAvgModelModule;
|
||||||
|
|
||||||
|
@ -116,6 +117,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM
|
||||||
|
|
||||||
// Reducers
|
// Reducers
|
||||||
DerivativeReducer.registerStreams();
|
DerivativeReducer.registerStreams();
|
||||||
|
InternalDerivative.registerStreams();
|
||||||
InternalSimpleValue.registerStreams();
|
InternalSimpleValue.registerStreams();
|
||||||
InternalBucketMetricValue.registerStreams();
|
InternalBucketMetricValue.registerStreams();
|
||||||
MaxBucketReducer.registerStreams();
|
MaxBucketReducer.registerStreams();
|
||||||
|
|
|
@ -49,10 +49,10 @@ public class DateHistogramParser implements Aggregator.Parser {
|
||||||
static final ParseField OFFSET = new ParseField("offset");
|
static final ParseField OFFSET = new ParseField("offset");
|
||||||
static final ParseField INTERVAL = new ParseField("interval");
|
static final ParseField INTERVAL = new ParseField("interval");
|
||||||
|
|
||||||
private final ImmutableMap<String, DateTimeUnit> dateFieldUnits;
|
public static final ImmutableMap<String, DateTimeUnit> DATE_FIELD_UNITS;
|
||||||
|
|
||||||
public DateHistogramParser() {
|
static {
|
||||||
dateFieldUnits = MapBuilder.<String, DateTimeUnit>newMapBuilder()
|
DATE_FIELD_UNITS = MapBuilder.<String, DateTimeUnit>newMapBuilder()
|
||||||
.put("year", DateTimeUnit.YEAR_OF_CENTURY)
|
.put("year", DateTimeUnit.YEAR_OF_CENTURY)
|
||||||
.put("1y", DateTimeUnit.YEAR_OF_CENTURY)
|
.put("1y", DateTimeUnit.YEAR_OF_CENTURY)
|
||||||
.put("quarter", DateTimeUnit.QUARTER)
|
.put("quarter", DateTimeUnit.QUARTER)
|
||||||
|
@ -184,7 +184,7 @@ public class DateHistogramParser implements Aggregator.Parser {
|
||||||
}
|
}
|
||||||
|
|
||||||
TimeZoneRounding.Builder tzRoundingBuilder;
|
TimeZoneRounding.Builder tzRoundingBuilder;
|
||||||
DateTimeUnit dateTimeUnit = dateFieldUnits.get(interval);
|
DateTimeUnit dateTimeUnit = DATE_FIELD_UNITS.get(interval);
|
||||||
if (dateTimeUnit != null) {
|
if (dateTimeUnit != null) {
|
||||||
tzRoundingBuilder = TimeZoneRounding.builder(dateTimeUnit);
|
tzRoundingBuilder = TimeZoneRounding.builder(dateTimeUnit);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -310,6 +310,10 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Rounding getRounding() {
|
||||||
|
return emptyBucketInfo.rounding;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalHistogram<B> create(List<B> buckets) {
|
public InternalHistogram<B> create(List<B> buckets) {
|
||||||
return getFactory().create(buckets, this);
|
return getFactory().create(buckets, this);
|
||||||
|
|
|
@ -53,7 +53,8 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
|
||||||
|
|
||||||
private double value;
|
private double value;
|
||||||
|
|
||||||
InternalSimpleValue() {} // for serialization
|
protected InternalSimpleValue() {
|
||||||
|
} // for serialization
|
||||||
|
|
||||||
public InternalSimpleValue(String name, double value, @Nullable ValueFormatter formatter, List<Reducer> reducers, Map<String, Object> metaData) {
|
public InternalSimpleValue(String name, double value, @Nullable ValueFormatter formatter, List<Reducer> reducers, Map<String, Object> metaData) {
|
||||||
super(name, reducers, metaData);
|
super(name, reducers, metaData);
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.search.aggregations.reducers.derivative;
|
||||||
|
|
||||||
|
import org.elasticsearch.search.aggregations.reducers.SimpleValue;
|
||||||
|
|
||||||
|
public interface Derivative extends SimpleValue {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the normalized value. If no normalised factor has been specified
|
||||||
|
* this method will return {@link #value()}
|
||||||
|
*
|
||||||
|
* @return the normalized value
|
||||||
|
*/
|
||||||
|
double normalizedValue();
|
||||||
|
}
|
|
@ -20,16 +20,17 @@
|
||||||
package org.elasticsearch.search.aggregations.reducers.derivative;
|
package org.elasticsearch.search.aggregations.reducers.derivative;
|
||||||
|
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
|
||||||
|
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||||
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
|
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
|
||||||
|
|
||||||
public class DerivativeBuilder extends ReducerBuilder<DerivativeBuilder> {
|
public class DerivativeBuilder extends ReducerBuilder<DerivativeBuilder> {
|
||||||
|
|
||||||
private String format;
|
private String format;
|
||||||
private GapPolicy gapPolicy;
|
private GapPolicy gapPolicy;
|
||||||
|
private String unit;
|
||||||
|
|
||||||
public DerivativeBuilder(String name) {
|
public DerivativeBuilder(String name) {
|
||||||
super(name, DerivativeReducer.TYPE.name());
|
super(name, DerivativeReducer.TYPE.name());
|
||||||
|
@ -45,6 +46,21 @@ public class DerivativeBuilder extends ReducerBuilder<DerivativeBuilder> {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public DerivativeBuilder unit(String unit) {
|
||||||
|
this.unit = unit;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the unit using the provided {@link DateHistogramInterval}. This
|
||||||
|
* method is only useful when calculating the derivative using a
|
||||||
|
* `date_histogram`
|
||||||
|
*/
|
||||||
|
public DerivativeBuilder unit(DateHistogramInterval unit) {
|
||||||
|
this.unit = unit.toString();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
if (format != null) {
|
if (format != null) {
|
||||||
|
@ -53,6 +69,9 @@ public class DerivativeBuilder extends ReducerBuilder<DerivativeBuilder> {
|
||||||
if (gapPolicy != null) {
|
if (gapPolicy != null) {
|
||||||
builder.field(DerivativeParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
|
builder.field(DerivativeParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
|
||||||
}
|
}
|
||||||
|
if (unit != null) {
|
||||||
|
builder.field(DerivativeParser.UNIT.getPreferredName(), unit);
|
||||||
|
}
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,8 +19,12 @@
|
||||||
|
|
||||||
package org.elasticsearch.search.aggregations.reducers.derivative;
|
package org.elasticsearch.search.aggregations.reducers.derivative;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.ParseField;
|
||||||
|
import org.elasticsearch.common.rounding.DateTimeUnit;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.search.SearchParseException;
|
import org.elasticsearch.search.SearchParseException;
|
||||||
|
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramParser;
|
||||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||||
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||||
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
||||||
|
@ -34,6 +38,10 @@ import java.util.List;
|
||||||
|
|
||||||
public class DerivativeParser implements Reducer.Parser {
|
public class DerivativeParser implements Reducer.Parser {
|
||||||
|
|
||||||
|
public static final ParseField FORMAT = new ParseField("format");
|
||||||
|
public static final ParseField GAP_POLICY = new ParseField("gap_policy");
|
||||||
|
public static final ParseField UNIT = new ParseField("unit");
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String type() {
|
public String type() {
|
||||||
return DerivativeReducer.TYPE.name();
|
return DerivativeReducer.TYPE.name();
|
||||||
|
@ -45,6 +53,7 @@ public class DerivativeParser implements Reducer.Parser {
|
||||||
String currentFieldName = null;
|
String currentFieldName = null;
|
||||||
String[] bucketsPaths = null;
|
String[] bucketsPaths = null;
|
||||||
String format = null;
|
String format = null;
|
||||||
|
String units = null;
|
||||||
GapPolicy gapPolicy = GapPolicy.SKIP;
|
GapPolicy gapPolicy = GapPolicy.SKIP;
|
||||||
|
|
||||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||||
|
@ -57,6 +66,8 @@ public class DerivativeParser implements Reducer.Parser {
|
||||||
bucketsPaths = new String[] { parser.text() };
|
bucketsPaths = new String[] { parser.text() };
|
||||||
} else if (GAP_POLICY.match(currentFieldName)) {
|
} else if (GAP_POLICY.match(currentFieldName)) {
|
||||||
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
|
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
|
||||||
|
} else if (UNIT.match(currentFieldName)) {
|
||||||
|
units = parser.text();
|
||||||
} else {
|
} else {
|
||||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
|
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
|
||||||
+ currentFieldName + "].", parser.getTokenLocation());
|
+ currentFieldName + "].", parser.getTokenLocation());
|
||||||
|
@ -89,7 +100,20 @@ public class DerivativeParser implements Reducer.Parser {
|
||||||
formatter = ValueFormat.Patternable.Number.format(format).formatter();
|
formatter = ValueFormat.Patternable.Number.format(format).formatter();
|
||||||
}
|
}
|
||||||
|
|
||||||
return new DerivativeReducer.Factory(reducerName, bucketsPaths, formatter, gapPolicy);
|
Long xAxisUnits = null;
|
||||||
|
if (units != null) {
|
||||||
|
DateTimeUnit dateTimeUnit = DateHistogramParser.DATE_FIELD_UNITS.get(units);
|
||||||
|
if (dateTimeUnit != null) {
|
||||||
|
xAxisUnits = dateTimeUnit.field().getDurationField().getUnitMillis();
|
||||||
|
} else {
|
||||||
|
TimeValue timeValue = TimeValue.parseTimeValue(units, null);
|
||||||
|
if (timeValue != null) {
|
||||||
|
xAxisUnits = timeValue.getMillis();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new DerivativeReducer.Factory(reducerName, bucketsPaths, formatter, gapPolicy, xAxisUnits);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.search.aggregations.AggregationExecutionException;
|
||||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||||
|
@ -32,12 +33,12 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator;
|
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator;
|
||||||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
|
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
|
||||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||||
import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue;
|
|
||||||
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||||
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
||||||
import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
|
import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
|
||||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -65,15 +66,17 @@ public class DerivativeReducer extends Reducer {
|
||||||
|
|
||||||
private ValueFormatter formatter;
|
private ValueFormatter formatter;
|
||||||
private GapPolicy gapPolicy;
|
private GapPolicy gapPolicy;
|
||||||
|
private Double xAxisUnits;
|
||||||
|
|
||||||
public DerivativeReducer() {
|
public DerivativeReducer() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public DerivativeReducer(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy,
|
public DerivativeReducer(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy, Long xAxisUnits,
|
||||||
Map<String, Object> metadata) {
|
Map<String, Object> metadata) {
|
||||||
super(name, bucketsPaths, metadata);
|
super(name, bucketsPaths, metadata);
|
||||||
this.formatter = formatter;
|
this.formatter = formatter;
|
||||||
this.gapPolicy = gapPolicy;
|
this.gapPolicy = gapPolicy;
|
||||||
|
this.xAxisUnits = xAxisUnits == null ? null : (double) xAxisUnits;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -88,51 +91,83 @@ public class DerivativeReducer extends Reducer {
|
||||||
InternalHistogram.Factory<? extends InternalHistogram.Bucket> factory = histo.getFactory();
|
InternalHistogram.Factory<? extends InternalHistogram.Bucket> factory = histo.getFactory();
|
||||||
|
|
||||||
List newBuckets = new ArrayList<>();
|
List newBuckets = new ArrayList<>();
|
||||||
|
Long lastBucketKey = null;
|
||||||
Double lastBucketValue = null;
|
Double lastBucketValue = null;
|
||||||
for (InternalHistogram.Bucket bucket : buckets) {
|
for (InternalHistogram.Bucket bucket : buckets) {
|
||||||
|
Long thisBucketKey = resolveBucketKeyAsLong(bucket);
|
||||||
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
|
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
|
||||||
if (lastBucketValue != null) {
|
if (lastBucketValue != null) {
|
||||||
double diff = thisBucketValue - lastBucketValue;
|
double gradient = thisBucketValue - lastBucketValue;
|
||||||
|
double xDiff = -1;
|
||||||
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), AGGREGATION_TRANFORM_FUNCTION));
|
if (xAxisUnits != null) {
|
||||||
aggs.add(new InternalSimpleValue(name(), diff, formatter, new ArrayList<Reducer>(), metaData()));
|
xDiff = (thisBucketKey - lastBucketKey) / xAxisUnits;
|
||||||
|
}
|
||||||
|
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(),
|
||||||
|
AGGREGATION_TRANFORM_FUNCTION));
|
||||||
|
aggs.add(new InternalDerivative(name(), gradient, xDiff, formatter, new ArrayList<Reducer>(), metaData()));
|
||||||
InternalHistogram.Bucket newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), new InternalAggregations(
|
InternalHistogram.Bucket newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), new InternalAggregations(
|
||||||
aggs), bucket.getKeyed(), bucket.getFormatter());
|
aggs), bucket.getKeyed(), bucket.getFormatter());
|
||||||
newBuckets.add(newBucket);
|
newBuckets.add(newBucket);
|
||||||
} else {
|
} else {
|
||||||
newBuckets.add(bucket);
|
newBuckets.add(bucket);
|
||||||
}
|
}
|
||||||
|
lastBucketKey = thisBucketKey;
|
||||||
lastBucketValue = thisBucketValue;
|
lastBucketValue = thisBucketValue;
|
||||||
}
|
}
|
||||||
return factory.create(newBuckets, histo);
|
return factory.create(newBuckets, histo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Long resolveBucketKeyAsLong(InternalHistogram.Bucket bucket) {
|
||||||
|
Object key = bucket.getKey();
|
||||||
|
if (key instanceof DateTime) {
|
||||||
|
return ((DateTime) key).getMillis();
|
||||||
|
} else if (key instanceof Number) {
|
||||||
|
return ((Number) key).longValue();
|
||||||
|
} else {
|
||||||
|
throw new AggregationExecutionException("Bucket keys must be either a Number or a DateTime for aggregation " + name()
|
||||||
|
+ ". Found bucket with key " + key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void doReadFrom(StreamInput in) throws IOException {
|
public void doReadFrom(StreamInput in) throws IOException {
|
||||||
formatter = ValueFormatterStreams.readOptional(in);
|
formatter = ValueFormatterStreams.readOptional(in);
|
||||||
gapPolicy = GapPolicy.readFrom(in);
|
gapPolicy = GapPolicy.readFrom(in);
|
||||||
|
if (in.readBoolean()) {
|
||||||
|
xAxisUnits = in.readDouble();
|
||||||
|
} else {
|
||||||
|
xAxisUnits = null;
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void doWriteTo(StreamOutput out) throws IOException {
|
public void doWriteTo(StreamOutput out) throws IOException {
|
||||||
ValueFormatterStreams.writeOptional(formatter, out);
|
ValueFormatterStreams.writeOptional(formatter, out);
|
||||||
gapPolicy.writeTo(out);
|
gapPolicy.writeTo(out);
|
||||||
|
boolean hasXAxisUnitsValue = xAxisUnits != null;
|
||||||
|
out.writeBoolean(hasXAxisUnitsValue);
|
||||||
|
if (hasXAxisUnitsValue) {
|
||||||
|
out.writeDouble(xAxisUnits);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Factory extends ReducerFactory {
|
public static class Factory extends ReducerFactory {
|
||||||
|
|
||||||
private final ValueFormatter formatter;
|
private final ValueFormatter formatter;
|
||||||
private GapPolicy gapPolicy;
|
private GapPolicy gapPolicy;
|
||||||
|
private Long xAxisUnits;
|
||||||
|
|
||||||
public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy) {
|
public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy, Long xAxisUnits) {
|
||||||
super(name, TYPE.name(), bucketsPaths);
|
super(name, TYPE.name(), bucketsPaths);
|
||||||
this.formatter = formatter;
|
this.formatter = formatter;
|
||||||
this.gapPolicy = gapPolicy;
|
this.gapPolicy = gapPolicy;
|
||||||
|
this.xAxisUnits = xAxisUnits;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Reducer createInternal(Map<String, Object> metaData) throws IOException {
|
protected Reducer createInternal(Map<String, Object> metaData) throws IOException {
|
||||||
return new DerivativeReducer(name, bucketsPaths, formatter, gapPolicy, metaData);
|
return new DerivativeReducer(name, bucketsPaths, formatter, gapPolicy, xAxisUnits, metaData);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.search.aggregations.reducers.derivative;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||||
|
import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue;
|
||||||
|
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||||
|
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class InternalDerivative extends InternalSimpleValue implements Derivative {
|
||||||
|
|
||||||
|
public final static Type TYPE = new Type("derivative");
|
||||||
|
|
||||||
|
public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
|
||||||
|
@Override
|
||||||
|
public InternalDerivative readResult(StreamInput in) throws IOException {
|
||||||
|
InternalDerivative result = new InternalDerivative();
|
||||||
|
result.readFrom(in);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public static void registerStreams() {
|
||||||
|
AggregationStreams.registerStream(STREAM, TYPE.stream());
|
||||||
|
}
|
||||||
|
|
||||||
|
private double normalizationFactor;
|
||||||
|
|
||||||
|
InternalDerivative() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public InternalDerivative(String name, double value, double normalizationFactor, ValueFormatter formatter, List<Reducer> reducers,
|
||||||
|
Map<String, Object> metaData) {
|
||||||
|
super(name, value, formatter, reducers, metaData);
|
||||||
|
this.normalizationFactor = normalizationFactor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public double normalizedValue() {
|
||||||
|
return normalizationFactor > 0 ? (value() / normalizationFactor) : value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Type type() {
|
||||||
|
return TYPE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getProperty(List<String> path) {
|
||||||
|
if (path.isEmpty()) {
|
||||||
|
return this;
|
||||||
|
} else if (path.size() == 1 && "value".equals(path.get(0))) {
|
||||||
|
return value();
|
||||||
|
} else if (path.size() == 1 && "normalized_value".equals(path.get(0))) {
|
||||||
|
return normalizedValue();
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doWriteTo(StreamOutput out) throws IOException {
|
||||||
|
super.doWriteTo(out);
|
||||||
|
out.writeDouble(normalizationFactor);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doReadFrom(StreamInput in) throws IOException {
|
||||||
|
super.doReadFrom(in);
|
||||||
|
normalizationFactor = in.readDouble();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||||
|
super.doXContentBody(builder, params);
|
||||||
|
|
||||||
|
if (normalizationFactor > 0) {
|
||||||
|
boolean hasValue = !(Double.isInfinite(normalizedValue()) || Double.isNaN(normalizedValue()));
|
||||||
|
builder.field("normalized_value", hasValue ? normalizedValue() : null);
|
||||||
|
if (hasValue && valueFormatter != null) {
|
||||||
|
builder.field("normalized_value_as_string", valueFormatter.format(normalizedValue()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
|
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
|
||||||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
|
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
|
||||||
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
|
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
|
||||||
|
import org.elasticsearch.search.aggregations.reducers.derivative.Derivative;
|
||||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
@ -45,6 +46,7 @@ import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHist
|
||||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
|
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
|
||||||
import static org.elasticsearch.search.aggregations.reducers.ReducerBuilders.derivative;
|
import static org.elasticsearch.search.aggregations.reducers.ReducerBuilders.derivative;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
||||||
|
import static org.hamcrest.Matchers.closeTo;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.core.IsNull.notNullValue;
|
import static org.hamcrest.core.IsNull.notNullValue;
|
||||||
|
@ -146,6 +148,52 @@ public class DateDerivativeTests extends ElasticsearchIntegrationTest {
|
||||||
assertThat(docCountDeriv.value(), equalTo(1d));
|
assertThat(docCountDeriv.value(), equalTo(1d));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void singleValuedField_normalised() throws Exception {
|
||||||
|
SearchResponse response = client()
|
||||||
|
.prepareSearch("idx")
|
||||||
|
.addAggregation(
|
||||||
|
dateHistogram("histo").field("date").interval(DateHistogramInterval.MONTH).minDocCount(0)
|
||||||
|
.subAggregation(derivative("deriv").setBucketsPaths("_count").unit(DateHistogramInterval.DAY))).execute()
|
||||||
|
.actionGet();
|
||||||
|
|
||||||
|
assertSearchResponse(response);
|
||||||
|
|
||||||
|
InternalHistogram deriv = response.getAggregations().get("histo");
|
||||||
|
assertThat(deriv, notNullValue());
|
||||||
|
assertThat(deriv.getName(), equalTo("histo"));
|
||||||
|
List<? extends Bucket> buckets = deriv.getBuckets();
|
||||||
|
assertThat(buckets.size(), equalTo(3));
|
||||||
|
|
||||||
|
DateTime key = new DateTime(2012, 1, 1, 0, 0, DateTimeZone.UTC);
|
||||||
|
Histogram.Bucket bucket = buckets.get(0);
|
||||||
|
assertThat(bucket, notNullValue());
|
||||||
|
assertThat((DateTime) bucket.getKey(), equalTo(key));
|
||||||
|
assertThat(bucket.getDocCount(), equalTo(1l));
|
||||||
|
Derivative docCountDeriv = bucket.getAggregations().get("deriv");
|
||||||
|
assertThat(docCountDeriv, nullValue());
|
||||||
|
|
||||||
|
key = new DateTime(2012, 2, 1, 0, 0, DateTimeZone.UTC);
|
||||||
|
bucket = buckets.get(1);
|
||||||
|
assertThat(bucket, notNullValue());
|
||||||
|
assertThat((DateTime) bucket.getKey(), equalTo(key));
|
||||||
|
assertThat(bucket.getDocCount(), equalTo(2l));
|
||||||
|
docCountDeriv = bucket.getAggregations().get("deriv");
|
||||||
|
assertThat(docCountDeriv, notNullValue());
|
||||||
|
assertThat(docCountDeriv.value(), closeTo(1d, 0.00001));
|
||||||
|
assertThat(docCountDeriv.normalizedValue(), closeTo(1d / 31d, 0.00001));
|
||||||
|
|
||||||
|
key = new DateTime(2012, 3, 1, 0, 0, DateTimeZone.UTC);
|
||||||
|
bucket = buckets.get(2);
|
||||||
|
assertThat(bucket, notNullValue());
|
||||||
|
assertThat((DateTime) bucket.getKey(), equalTo(key));
|
||||||
|
assertThat(bucket.getDocCount(), equalTo(3l));
|
||||||
|
docCountDeriv = bucket.getAggregations().get("deriv");
|
||||||
|
assertThat(docCountDeriv, notNullValue());
|
||||||
|
assertThat(docCountDeriv.value(), closeTo(1d, 0.00001));
|
||||||
|
assertThat(docCountDeriv.normalizedValue(), closeTo(1d / 29d, 0.00001));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void singleValuedField_WithSubAggregation() throws Exception {
|
public void singleValuedField_WithSubAggregation() throws Exception {
|
||||||
SearchResponse response = client()
|
SearchResponse response = client()
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.
|
||||||
import org.elasticsearch.search.aggregations.metrics.stats.Stats;
|
import org.elasticsearch.search.aggregations.metrics.stats.Stats;
|
||||||
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
|
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
|
||||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||||
|
import org.elasticsearch.search.aggregations.reducers.derivative.Derivative;
|
||||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
|
@ -199,6 +200,49 @@ public class DerivativeTests extends ElasticsearchIntegrationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* test first and second derivative on the sing
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void singleValuedField_normalised() {
|
||||||
|
|
||||||
|
SearchResponse response = client()
|
||||||
|
.prepareSearch("idx")
|
||||||
|
.addAggregation(
|
||||||
|
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0)
|
||||||
|
.subAggregation(derivative("deriv").setBucketsPaths("_count").unit("1"))
|
||||||
|
.subAggregation(derivative("2nd_deriv").setBucketsPaths("deriv").unit("10"))).execute().actionGet();
|
||||||
|
|
||||||
|
assertSearchResponse(response);
|
||||||
|
|
||||||
|
InternalHistogram<Bucket> deriv = response.getAggregations().get("histo");
|
||||||
|
assertThat(deriv, notNullValue());
|
||||||
|
assertThat(deriv.getName(), equalTo("histo"));
|
||||||
|
List<? extends Bucket> buckets = deriv.getBuckets();
|
||||||
|
assertThat(buckets.size(), equalTo(numValueBuckets));
|
||||||
|
|
||||||
|
for (int i = 0; i < numValueBuckets; ++i) {
|
||||||
|
Histogram.Bucket bucket = buckets.get(i);
|
||||||
|
checkBucketKeyAndDocCount("Bucket " + i, bucket, i * interval, valueCounts[i]);
|
||||||
|
Derivative docCountDeriv = bucket.getAggregations().get("deriv");
|
||||||
|
if (i > 0) {
|
||||||
|
assertThat(docCountDeriv, notNullValue());
|
||||||
|
assertThat(docCountDeriv.value(), closeTo((double) (firstDerivValueCounts[i - 1]), 0.00001));
|
||||||
|
assertThat(docCountDeriv.normalizedValue(), closeTo((double) (firstDerivValueCounts[i - 1]) / 5, 0.00001));
|
||||||
|
} else {
|
||||||
|
assertThat(docCountDeriv, nullValue());
|
||||||
|
}
|
||||||
|
Derivative docCount2ndDeriv = bucket.getAggregations().get("2nd_deriv");
|
||||||
|
if (i > 1) {
|
||||||
|
assertThat(docCount2ndDeriv, notNullValue());
|
||||||
|
assertThat(docCount2ndDeriv.value(), closeTo((double) (secondDerivValueCounts[i - 2]), 0.00001));
|
||||||
|
assertThat(docCount2ndDeriv.normalizedValue(), closeTo((double) (secondDerivValueCounts[i - 2]) * 2, 0.00001));
|
||||||
|
} else {
|
||||||
|
assertThat(docCount2ndDeriv, nullValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void singleValueAggDerivative() throws Exception {
|
public void singleValueAggDerivative() throws Exception {
|
||||||
SearchResponse response = client()
|
SearchResponse response = client()
|
||||||
|
|
Loading…
Reference in New Issue