diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/BucketHelpers.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/BucketHelpers.java new file mode 100644 index 00000000000..145ff1dea1f --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/BucketHelpers.java @@ -0,0 +1,160 @@ +package org.elasticsearch.search.aggregations.reducers; + +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.search.aggregations.InvalidAggregationPathException; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; +import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; +import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser; +import org.elasticsearch.search.aggregations.support.AggregationPath; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * A set of static helpers to simplify working with aggregation buckets, in particular + * providing utilities that help reducers. + */ +public class BucketHelpers { + + /** + * A gap policy determines how "holes" in a set of buckets should be handled. For example, + * a date_histogram might have empty buckets due to no data existing for that time interval. + * This can cause problems for operations like a derivative, which relies on a continuous + * function. + * + * "insert_zeros": empty buckets will be filled with zeros for all metrics + * "ignore": empty buckets will simply be ignored + */ + public static enum GapPolicy { + INSERT_ZEROS((byte) 0, "insert_zeros"), IGNORE((byte) 1, "ignore"); + + /** + * Parse a string GapPolicy into the byte enum + * + * @param context SearchContext this is taking place in + * @param text GapPolicy in string format (e.g. "ignore") + * @return GapPolicy enum + */ + public static GapPolicy parse(SearchContext context, String text) { + GapPolicy result = null; + for (GapPolicy policy : values()) { + if (policy.parseField.match(text)) { + if (result == null) { + result = policy; + } else { + throw new ElasticsearchIllegalStateException("Text can be parsed to 2 different gap policies: text=[" + text + + "], " + "policies=" + Arrays.asList(result, policy)); + } + } + } + if (result == null) { + final List validNames = new ArrayList<>(); + for (GapPolicy policy : values()) { + validNames.add(policy.getName()); + } + throw new SearchParseException(context, "Invalid gap policy: [" + text + "], accepted values: " + validNames); + } + return result; + } + + private final byte id; + private final ParseField parseField; + + private GapPolicy(byte id, String name) { + this.id = id; + this.parseField = new ParseField(name); + } + + /** + * Serialize the GapPolicy to the output stream + * + * @param out + * @throws IOException + */ + public void writeTo(StreamOutput out) throws IOException { + out.writeByte(id); + } + + /** + * Deserialize the GapPolicy from the input stream + * + * @param in + * @return GapPolicy Enum + * @throws IOException + */ + public static GapPolicy readFrom(StreamInput in) throws IOException { + byte id = in.readByte(); + for (GapPolicy gapPolicy : values()) { + if (id == gapPolicy.id) { + return gapPolicy; + } + } + throw new IllegalStateException("Unknown GapPolicy with id [" + id + "]"); + } + + /** + * Return the english-formatted name of the GapPolicy + * + * @return English representation of GapPolicy + */ + public String getName() { + return parseField.getPreferredName(); + } + } + + /** + * Given a path and a set of buckets, this method will return the value inside the agg at + * that path. This is used to extract values for use by reducers (e.g. a derivative might need + * the price for each bucket). If the bucket is empty, the configured GapPolicy is invoked to + * resolve the missing bucket + * + * @param histo A series of agg buckets in the form of a histogram + * @param bucket A specific bucket that a value needs to be extracted from. This bucket should be present + * in the histo parameter + * @param aggPath The path to a particular value that needs to be extracted. This path should point to a metric + * inside the bucket + * @param gapPolicy The gap policy to apply if empty buckets are found + * @return The value extracted from bucket found at aggPath + */ + public static Double resolveBucketValue(InternalHistogram histo, InternalHistogram.Bucket bucket, + String aggPath, GapPolicy gapPolicy) { + try { + Object propertyValue = bucket.getProperty(histo.getName(), AggregationPath.parse(aggPath).getPathElementsAsStringList()); + if (propertyValue == null) { + throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName() + + " must reference either a number value or a single value numeric metric aggregation"); + } else { + double value; + if (propertyValue instanceof Number) { + value = ((Number) propertyValue).doubleValue(); + } else if (propertyValue instanceof InternalNumericMetricsAggregation.SingleValue) { + value = ((InternalNumericMetricsAggregation.SingleValue) propertyValue).value(); + } else { + throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName() + + " must reference either a number value or a single value numeric metric aggregation"); + } + if (Double.isInfinite(value) || Double.isNaN(value)) { + switch (gapPolicy) { + case INSERT_ZEROS: + return 0.0; + case IGNORE: + default: + return Double.NaN; + } + } else { + return value; + } + } + } catch (InvalidAggregationPathException e) { + return null; + } + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeBuilder.java index f868e673b1d..210d56d4a6f 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeBuilder.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeBuilder.java @@ -21,10 +21,11 @@ package org.elasticsearch.search.aggregations.reducers.derivative; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.reducers.ReducerBuilder; -import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer.GapPolicy; import java.io.IOException; +import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; + public class DerivativeBuilder extends ReducerBuilder { private String format; diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeParser.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeParser.java index 6b6b826ec6f..c4d3aa2a229 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeParser.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeParser.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchParseException; import org.elasticsearch.search.aggregations.reducers.Reducer; import org.elasticsearch.search.aggregations.reducers.ReducerFactory; -import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer.GapPolicy; import org.elasticsearch.search.aggregations.support.format.ValueFormat; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.internal.SearchContext; @@ -33,6 +32,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; + public class DerivativeParser implements Reducer.Parser { public static final ParseField FORMAT = new ParseField("format"); diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java index c0d96f4056b..1130639a1a2 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/derivative/DerivativeReducer.java @@ -22,38 +22,30 @@ package org.elasticsearch.search.aggregations.reducers.derivative; import com.google.common.base.Function; import com.google.common.collect.Lists; -import org.elasticsearch.ElasticsearchIllegalStateException; + import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.search.SearchParseException; import org.elasticsearch.search.aggregations.Aggregation; -import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.InvalidAggregationPathException; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; -import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; -import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue; -import org.elasticsearch.search.aggregations.reducers.Reducer; -import org.elasticsearch.search.aggregations.reducers.ReducerFactory; -import org.elasticsearch.search.aggregations.reducers.ReducerStreams; +import org.elasticsearch.search.aggregations.reducers.*; import org.elasticsearch.search.aggregations.support.AggregationContext; -import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; -import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; +import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; +import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.resolveBucketValue; + public class DerivativeReducer extends Reducer { public final static Type TYPE = new Type("derivative"); @@ -105,7 +97,7 @@ public class DerivativeReducer extends Reducer { List newBuckets = new ArrayList<>(); Double lastBucketValue = null; for (InternalHistogram.Bucket bucket : buckets) { - Double thisBucketValue = resolveBucketValue(histo, bucket); + Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy); if (lastBucketValue != null) { double diff = thisBucketValue - lastBucketValue; @@ -122,40 +114,6 @@ public class DerivativeReducer extends Reducer { return factory.create(histo.getName(), newBuckets, histo); } - private Double resolveBucketValue(InternalHistogram histo, InternalHistogram.Bucket bucket) { - try { - Object propertyValue = bucket.getProperty(histo.getName(), AggregationPath.parse(bucketsPaths()[0]) - .getPathElementsAsStringList()); - if (propertyValue == null) { - throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName() - + " must reference either a number value or a single value numeric metric aggregation"); - } else { - double value; - if (propertyValue instanceof Number) { - value = ((Number) propertyValue).doubleValue(); - } else if (propertyValue instanceof InternalNumericMetricsAggregation.SingleValue) { - value = ((InternalNumericMetricsAggregation.SingleValue) propertyValue).value(); - } else { - throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName() - + " must reference either a number value or a single value numeric metric aggregation"); - } - if (Double.isInfinite(value) || Double.isNaN(value)) { - switch (gapPolicy) { - case INSERT_ZEROS: - return 0.0; - case IGNORE: - default: - return Double.NaN; - } - } else { - return value; - } - } - } catch (InvalidAggregationPathException e) { - return null; - } - } - @Override public void doReadFrom(StreamInput in) throws IOException { formatter = ValueFormatterStreams.readOptional(in); @@ -186,56 +144,4 @@ public class DerivativeReducer extends Reducer { } } - - public static enum GapPolicy { - INSERT_ZEROS((byte) 0, "insert_zeros"), IGNORE((byte) 1, "ignore"); - - public static GapPolicy parse(SearchContext context, String text) { - GapPolicy result = null; - for (GapPolicy policy : values()) { - if (policy.parseField.match(text)) { - if (result == null) { - result = policy; - } else { - throw new ElasticsearchIllegalStateException("Text can be parsed to 2 different gap policies: text=[" + text - + "], " + "policies=" + Arrays.asList(result, policy)); - } - } - } - if (result == null) { - final List validNames = new ArrayList<>(); - for (GapPolicy policy : values()) { - validNames.add(policy.getName()); - } - throw new SearchParseException(context, "Invalid gap policy: [" + text + "], accepted values: " + validNames); - } - return result; - } - - private final byte id; - private final ParseField parseField; - - private GapPolicy(byte id, String name) { - this.id = id; - this.parseField = new ParseField(name); - } - - public void writeTo(StreamOutput out) throws IOException { - out.writeByte(id); - } - - public static GapPolicy readFrom(StreamInput in) throws IOException { - byte id = in.readByte(); - for (GapPolicy gapPolicy : values()) { - if (id == gapPolicy.id) { - return gapPolicy; - } - } - throw new IllegalStateException("Unknown GapPolicy with id [" + id + "]"); - } - - public String getName() { - return parseField.getPreferredName(); - } - } } diff --git a/src/test/java/org/elasticsearch/search/aggregations/reducers/DerivativeTests.java b/src/test/java/org/elasticsearch/search/aggregations/reducers/DerivativeTests.java index 7d2d5500cd1..24a4c8cff5a 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/reducers/DerivativeTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/reducers/DerivativeTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.metrics.sum.Sum; -import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer.GapPolicy; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.hamcrest.Matchers; @@ -509,7 +508,7 @@ public class DerivativeTests extends ElasticsearchIntegrationTest { .setQuery(matchAllQuery()) .addAggregation( histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(1).minDocCount(0) - .subAggregation(derivative("deriv").setBucketsPaths("_count").gapPolicy(GapPolicy.INSERT_ZEROS))) + .subAggregation(derivative("deriv").setBucketsPaths("_count").gapPolicy(BucketHelpers.GapPolicy.INSERT_ZEROS))) .execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits(), equalTo(14l));