diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeParser.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeParser.java index 0e9b1f7f41f..55259102dfd 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeParser.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeParser.java @@ -24,6 +24,8 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchParseException; import org.elasticsearch.search.aggregations.reducers.Reducer; import org.elasticsearch.search.aggregations.reducers.ReducerFactory; +import org.elasticsearch.search.aggregations.support.format.ValueFormat; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; @@ -31,6 +33,7 @@ import java.io.IOException; public class DerivativeParser implements Reducer.Parser { public static final ParseField BUCKETS_PATH = new ParseField("bucketsPath"); + public static final ParseField FORMAT = new ParseField("format"); @Override public String type() { @@ -42,6 +45,7 @@ public class DerivativeParser implements Reducer.Parser { XContentParser.Token token; String currentFieldName = null; String bucketsPath = null; + String format = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -49,6 +53,8 @@ public class DerivativeParser implements Reducer.Parser { } else if (token == XContentParser.Token.VALUE_STRING) { if (BUCKETS_PATH.match(currentFieldName)) { bucketsPath = parser.text(); + } else if (FORMAT.match(currentFieldName)) { + format = parser.text(); } else { throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + currentFieldName + "]."); @@ -63,7 +69,12 @@ public class DerivativeParser implements Reducer.Parser { + "] for derivative aggregation [" + reducerName + "]"); } - return new DerivativeReducer.Factory(reducerName, bucketsPath); + ValueFormatter formatter = null; + if (format != null) { + formatter = ValueFormat.Patternable.Number.format(format).formatter(); + } + + return new DerivativeReducer.Factory(reducerName, bucketsPath, formatter); } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java index 9a707687cc2..26f40b2824d 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java @@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.reducers.derivative; import com.google.common.base.Function; import com.google.common.collect.Lists; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.aggregations.Aggregation; @@ -39,6 +40,7 @@ import org.elasticsearch.search.aggregations.reducers.ReducerFactory; import org.elasticsearch.search.aggregations.reducers.ReducerStreams; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationPath; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import java.io.IOException; import java.util.ArrayList; @@ -62,7 +64,6 @@ public class DerivativeReducer extends Reducer { ReducerStreams.registerStream(STREAM, TYPE.stream()); } - private String bucketsPath; private static final Function FUNCTION = new Function() { @Override public InternalAggregation apply(Aggregation input) { @@ -70,12 +71,16 @@ public class DerivativeReducer extends Reducer { } }; + private ValueFormatter formatter; + private String bucketsPath; + public DerivativeReducer() { } - public DerivativeReducer(String name, String bucketsPath, Map metadata) { + public DerivativeReducer(String name, String bucketsPath, @Nullable ValueFormatter formatter, Map metadata) { super(name, metadata); this.bucketsPath = bucketsPath; + this.formatter = formatter; } @Override @@ -97,9 +102,8 @@ public class DerivativeReducer extends Reducer { double diff = thisBucketValue - lastBucketValue; List aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), FUNCTION)); - aggs.add(new InternalSimpleValue(name(), diff, null, new ArrayList(), metaData())); // NOCOMMIT implement formatter for derivative reducer - InternalHistogram.Bucket newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), - new InternalAggregations( + aggs.add(new InternalSimpleValue(name(), diff, formatter, new ArrayList(), metaData())); + InternalHistogram.Bucket newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), new InternalAggregations( aggs), bucket.getKeyed(), bucket.getFormatter()); newBuckets.add(newBucket); } else { @@ -136,17 +140,19 @@ public class DerivativeReducer extends Reducer { public static class Factory extends ReducerFactory { - private String bucketsPath; + private final String bucketsPath; + private final ValueFormatter formatter; - public Factory(String name, String field) { + public Factory(String name, String bucketsPath, @Nullable ValueFormatter formatter) { super(name, TYPE.name()); - this.bucketsPath = field; + this.bucketsPath = bucketsPath; + this.formatter = formatter; } @Override protected Reducer createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket, Map metaData) throws IOException { - return new DerivativeReducer(name, bucketsPath, metaData); + return new DerivativeReducer(name, bucketsPath, formatter, metaData); } }