Move GapPolicy and resolveBucketValues() to static helper methods
Will allow many reducers to share the same helper functionality without repeating code. Chose to put these in static helpers instead of adding to Reducer base class. I can imagine other reducers that aren't time-based (or don't care about contiguous buckets), which would make things like gap policy useless. Since these seemed more like helpers than inherent traits of a Reducer, they went into their own static class. Closes #9954
This commit is contained in:
parent
dc03912731
commit
3131e01c9d
|
@ -0,0 +1,160 @@
|
|||
package org.elasticsearch.search.aggregations.reducers;
|
||||
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.AggregationExecutionException;
|
||||
import org.elasticsearch.search.aggregations.InvalidAggregationPathException;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
|
||||
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
|
||||
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A set of static helpers to simplify working with aggregation buckets, in particular
|
||||
* providing utilities that help reducers.
|
||||
*/
|
||||
public class BucketHelpers {
|
||||
|
||||
/**
|
||||
* A gap policy determines how "holes" in a set of buckets should be handled. For example,
|
||||
* a date_histogram might have empty buckets due to no data existing for that time interval.
|
||||
* This can cause problems for operations like a derivative, which relies on a continuous
|
||||
* function.
|
||||
*
|
||||
* "insert_zeros": empty buckets will be filled with zeros for all metrics
|
||||
* "ignore": empty buckets will simply be ignored
|
||||
*/
|
||||
public static enum GapPolicy {
|
||||
INSERT_ZEROS((byte) 0, "insert_zeros"), IGNORE((byte) 1, "ignore");
|
||||
|
||||
/**
|
||||
* Parse a string GapPolicy into the byte enum
|
||||
*
|
||||
* @param context SearchContext this is taking place in
|
||||
* @param text GapPolicy in string format (e.g. "ignore")
|
||||
* @return GapPolicy enum
|
||||
*/
|
||||
public static GapPolicy parse(SearchContext context, String text) {
|
||||
GapPolicy result = null;
|
||||
for (GapPolicy policy : values()) {
|
||||
if (policy.parseField.match(text)) {
|
||||
if (result == null) {
|
||||
result = policy;
|
||||
} else {
|
||||
throw new ElasticsearchIllegalStateException("Text can be parsed to 2 different gap policies: text=[" + text
|
||||
+ "], " + "policies=" + Arrays.asList(result, policy));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (result == null) {
|
||||
final List<String> validNames = new ArrayList<>();
|
||||
for (GapPolicy policy : values()) {
|
||||
validNames.add(policy.getName());
|
||||
}
|
||||
throw new SearchParseException(context, "Invalid gap policy: [" + text + "], accepted values: " + validNames);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private final byte id;
|
||||
private final ParseField parseField;
|
||||
|
||||
private GapPolicy(byte id, String name) {
|
||||
this.id = id;
|
||||
this.parseField = new ParseField(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialize the GapPolicy to the output stream
|
||||
*
|
||||
* @param out
|
||||
* @throws IOException
|
||||
*/
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeByte(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize the GapPolicy from the input stream
|
||||
*
|
||||
* @param in
|
||||
* @return GapPolicy Enum
|
||||
* @throws IOException
|
||||
*/
|
||||
public static GapPolicy readFrom(StreamInput in) throws IOException {
|
||||
byte id = in.readByte();
|
||||
for (GapPolicy gapPolicy : values()) {
|
||||
if (id == gapPolicy.id) {
|
||||
return gapPolicy;
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException("Unknown GapPolicy with id [" + id + "]");
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the english-formatted name of the GapPolicy
|
||||
*
|
||||
* @return English representation of GapPolicy
|
||||
*/
|
||||
public String getName() {
|
||||
return parseField.getPreferredName();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a path and a set of buckets, this method will return the value inside the agg at
|
||||
* that path. This is used to extract values for use by reducers (e.g. a derivative might need
|
||||
* the price for each bucket). If the bucket is empty, the configured GapPolicy is invoked to
|
||||
* resolve the missing bucket
|
||||
*
|
||||
* @param histo A series of agg buckets in the form of a histogram
|
||||
* @param bucket A specific bucket that a value needs to be extracted from. This bucket should be present
|
||||
* in the <code>histo</code> parameter
|
||||
* @param aggPath The path to a particular value that needs to be extracted. This path should point to a metric
|
||||
* inside the <code>bucket</code>
|
||||
* @param gapPolicy The gap policy to apply if empty buckets are found
|
||||
* @return The value extracted from <code>bucket</code> found at <code>aggPath</code>
|
||||
*/
|
||||
public static Double resolveBucketValue(InternalHistogram<? extends InternalHistogram.Bucket> histo, InternalHistogram.Bucket bucket,
|
||||
String aggPath, GapPolicy gapPolicy) {
|
||||
try {
|
||||
Object propertyValue = bucket.getProperty(histo.getName(), AggregationPath.parse(aggPath).getPathElementsAsStringList());
|
||||
if (propertyValue == null) {
|
||||
throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName()
|
||||
+ " must reference either a number value or a single value numeric metric aggregation");
|
||||
} else {
|
||||
double value;
|
||||
if (propertyValue instanceof Number) {
|
||||
value = ((Number) propertyValue).doubleValue();
|
||||
} else if (propertyValue instanceof InternalNumericMetricsAggregation.SingleValue) {
|
||||
value = ((InternalNumericMetricsAggregation.SingleValue) propertyValue).value();
|
||||
} else {
|
||||
throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName()
|
||||
+ " must reference either a number value or a single value numeric metric aggregation");
|
||||
}
|
||||
if (Double.isInfinite(value) || Double.isNaN(value)) {
|
||||
switch (gapPolicy) {
|
||||
case INSERT_ZEROS:
|
||||
return 0.0;
|
||||
case IGNORE:
|
||||
default:
|
||||
return Double.NaN;
|
||||
}
|
||||
} else {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
} catch (InvalidAggregationPathException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,10 +21,11 @@ package org.elasticsearch.search.aggregations.reducers.derivative;
|
|||
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
|
||||
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer.GapPolicy;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
|
||||
public class DerivativeBuilder extends ReducerBuilder<DerivativeBuilder> {
|
||||
|
||||
private String format;
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
||||
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
@ -33,6 +32,8 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
|
||||
public class DerivativeParser implements Reducer.Parser {
|
||||
|
||||
public static final ParseField FORMAT = new ParseField("format");
|
||||
|
|
|
@ -22,38 +22,30 @@ package org.elasticsearch.search.aggregations.reducers.derivative;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.AggregationExecutionException;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.InvalidAggregationPathException;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
|
||||
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
|
||||
import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue;
|
||||
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
|
||||
import org.elasticsearch.search.aggregations.reducers.*;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.resolveBucketValue;
|
||||
|
||||
public class DerivativeReducer extends Reducer {
|
||||
|
||||
public final static Type TYPE = new Type("derivative");
|
||||
|
@ -105,7 +97,7 @@ public class DerivativeReducer extends Reducer {
|
|||
List newBuckets = new ArrayList<>();
|
||||
Double lastBucketValue = null;
|
||||
for (InternalHistogram.Bucket bucket : buckets) {
|
||||
Double thisBucketValue = resolveBucketValue(histo, bucket);
|
||||
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
|
||||
if (lastBucketValue != null) {
|
||||
double diff = thisBucketValue - lastBucketValue;
|
||||
|
||||
|
@ -122,40 +114,6 @@ public class DerivativeReducer extends Reducer {
|
|||
return factory.create(histo.getName(), newBuckets, histo);
|
||||
}
|
||||
|
||||
private Double resolveBucketValue(InternalHistogram<? extends InternalHistogram.Bucket> histo, InternalHistogram.Bucket bucket) {
|
||||
try {
|
||||
Object propertyValue = bucket.getProperty(histo.getName(), AggregationPath.parse(bucketsPaths()[0])
|
||||
.getPathElementsAsStringList());
|
||||
if (propertyValue == null) {
|
||||
throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName()
|
||||
+ " must reference either a number value or a single value numeric metric aggregation");
|
||||
} else {
|
||||
double value;
|
||||
if (propertyValue instanceof Number) {
|
||||
value = ((Number) propertyValue).doubleValue();
|
||||
} else if (propertyValue instanceof InternalNumericMetricsAggregation.SingleValue) {
|
||||
value = ((InternalNumericMetricsAggregation.SingleValue) propertyValue).value();
|
||||
} else {
|
||||
throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName()
|
||||
+ " must reference either a number value or a single value numeric metric aggregation");
|
||||
}
|
||||
if (Double.isInfinite(value) || Double.isNaN(value)) {
|
||||
switch (gapPolicy) {
|
||||
case INSERT_ZEROS:
|
||||
return 0.0;
|
||||
case IGNORE:
|
||||
default:
|
||||
return Double.NaN;
|
||||
}
|
||||
} else {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
} catch (InvalidAggregationPathException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doReadFrom(StreamInput in) throws IOException {
|
||||
formatter = ValueFormatterStreams.readOptional(in);
|
||||
|
@ -186,56 +144,4 @@ public class DerivativeReducer extends Reducer {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
public static enum GapPolicy {
|
||||
INSERT_ZEROS((byte) 0, "insert_zeros"), IGNORE((byte) 1, "ignore");
|
||||
|
||||
public static GapPolicy parse(SearchContext context, String text) {
|
||||
GapPolicy result = null;
|
||||
for (GapPolicy policy : values()) {
|
||||
if (policy.parseField.match(text)) {
|
||||
if (result == null) {
|
||||
result = policy;
|
||||
} else {
|
||||
throw new ElasticsearchIllegalStateException("Text can be parsed to 2 different gap policies: text=[" + text
|
||||
+ "], " + "policies=" + Arrays.asList(result, policy));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (result == null) {
|
||||
final List<String> validNames = new ArrayList<>();
|
||||
for (GapPolicy policy : values()) {
|
||||
validNames.add(policy.getName());
|
||||
}
|
||||
throw new SearchParseException(context, "Invalid gap policy: [" + text + "], accepted values: " + validNames);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private final byte id;
|
||||
private final ParseField parseField;
|
||||
|
||||
private GapPolicy(byte id, String name) {
|
||||
this.id = id;
|
||||
this.parseField = new ParseField(name);
|
||||
}
|
||||
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeByte(id);
|
||||
}
|
||||
|
||||
public static GapPolicy readFrom(StreamInput in) throws IOException {
|
||||
byte id = in.readByte();
|
||||
for (GapPolicy gapPolicy : values()) {
|
||||
if (id == gapPolicy.id) {
|
||||
return gapPolicy;
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException("Unknown GapPolicy with id [" + id + "]");
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return parseField.getPreferredName();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
|||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
|
||||
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
|
||||
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.hamcrest.Matchers;
|
||||
|
@ -509,7 +508,7 @@ public class DerivativeTests extends ElasticsearchIntegrationTest {
|
|||
.setQuery(matchAllQuery())
|
||||
.addAggregation(
|
||||
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(1).minDocCount(0)
|
||||
.subAggregation(derivative("deriv").setBucketsPaths("_count").gapPolicy(GapPolicy.INSERT_ZEROS)))
|
||||
.subAggregation(derivative("deriv").setBucketsPaths("_count").gapPolicy(BucketHelpers.GapPolicy.INSERT_ZEROS)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(14l));
|
||||
|
|
Loading…
Reference in New Issue