Fix MaxBucketReducer to use gapPolicy

Also moved gapPolicy and format ParseField constants to common class
This commit is contained in:
Colin Goodheart-Smithe 2015-04-23 10:44:23 +01:00
parent 1a1ddceb47
commit 0ff4827e55
7 changed files with 93 additions and 15 deletions

View File

@ -47,6 +47,9 @@ public abstract class Reducer implements Streamable {
public static final ParseField BUCKETS_PATH = new ParseField("buckets_path");
public static final ParseField FORMAT = new ParseField("format");
public static final ParseField GAP_POLICY = new ParseField("gap_policy");
/**
* @return The reducer type this parser is associated with.
*/

View File

@ -20,13 +20,16 @@
package org.elasticsearch.search.aggregations.reducers.bucketmetrics;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser;
import java.io.IOException;
public class MaxBucketBuilder extends ReducerBuilder<MaxBucketBuilder> {
private String format;
private GapPolicy gapPolicy;
public MaxBucketBuilder(String name) {
super(name, MaxBucketReducer.TYPE.name());
@ -37,11 +40,19 @@ public class MaxBucketBuilder extends ReducerBuilder<MaxBucketBuilder> {
return this;
}
public MaxBucketBuilder gapPolicy(GapPolicy gapPolicy) {
this.gapPolicy = gapPolicy;
return this;
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(MaxBucketParser.FORMAT.getPreferredName(), format);
}
if (gapPolicy != null) {
builder.field(DerivativeParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
}
return builder;
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.reducers.bucketmetrics;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
@ -46,6 +47,7 @@ public class MaxBucketParser implements Reducer.Parser {
String currentFieldName = null;
String[] bucketsPaths = null;
String format = null;
GapPolicy gapPolicy = GapPolicy.IGNORE;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
@ -55,6 +57,8 @@ public class MaxBucketParser implements Reducer.Parser {
format = parser.text();
} else if (BUCKETS_PATH.match(currentFieldName)) {
bucketsPaths = new String[] { parser.text() };
} else if (GAP_POLICY.match(currentFieldName)) {
gapPolicy = GapPolicy.parse(context, parser.text());
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
@ -86,7 +90,7 @@ public class MaxBucketParser implements Reducer.Parser {
formatter = ValueFormat.Patternable.Number.format(format).formatter();
}
return new MaxBucketReducer.Factory(reducerName, bucketsPaths, formatter);
return new MaxBucketReducer.Factory(reducerName, bucketsPaths, gapPolicy, formatter);
}
}

View File

@ -61,6 +61,7 @@ public class MaxBucketReducer extends SiblingReducer {
};
private ValueFormatter formatter;
private GapPolicy gapPolicy;
public static void registerStreams() {
ReducerStreams.registerStream(STREAM, TYPE.stream());
@ -69,8 +70,10 @@ public class MaxBucketReducer extends SiblingReducer {
private MaxBucketReducer() {
}
protected MaxBucketReducer(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, Map<String, Object> metaData) {
protected MaxBucketReducer(String name, String[] bucketsPaths, GapPolicy gapPolicy, @Nullable ValueFormatter formatter,
Map<String, Object> metaData) {
super(name, bucketsPaths, metaData);
this.gapPolicy = gapPolicy;
this.formatter = formatter;
}
@ -90,7 +93,7 @@ public class MaxBucketReducer extends SiblingReducer {
List<? extends Bucket> buckets = multiBucketsAgg.getBuckets();
for (int i = 0; i < buckets.size(); i++) {
Bucket bucket = buckets.get(i);
Double bucketValue = BucketHelpers.resolveBucketValue(multiBucketsAgg, bucket, bucketsPath, GapPolicy.IGNORE);
Double bucketValue = BucketHelpers.resolveBucketValue(multiBucketsAgg, bucket, bucketsPath, gapPolicy);
if (bucketValue != null) {
if (bucketValue > maxValue) {
maxBucketKeys.clear();
@ -110,25 +113,29 @@ public class MaxBucketReducer extends SiblingReducer {
@Override
public void doReadFrom(StreamInput in) throws IOException {
formatter = ValueFormatterStreams.readOptional(in);
gapPolicy = GapPolicy.readFrom(in);
}
@Override
public void doWriteTo(StreamOutput out) throws IOException {
ValueFormatterStreams.writeOptional(formatter, out);
gapPolicy.writeTo(out);
}
public static class Factory extends ReducerFactory {
private final ValueFormatter formatter;
private final GapPolicy gapPolicy;
public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter) {
public Factory(String name, String[] bucketsPaths, GapPolicy gapPolicy, @Nullable ValueFormatter formatter) {
super(name, TYPE.name(), bucketsPaths);
this.gapPolicy = gapPolicy;
this.formatter = formatter;
}
@Override
protected Reducer createInternal(Map<String, Object> metaData) throws IOException {
return new MaxBucketReducer(name, bucketsPaths, formatter, metaData);
return new MaxBucketReducer(name, bucketsPaths, gapPolicy, formatter, metaData);
}
@Override

View File

@ -19,9 +19,9 @@
package org.elasticsearch.search.aggregations.reducers.derivative;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
@ -32,13 +32,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
public class DerivativeParser implements Reducer.Parser {
public static final ParseField FORMAT = new ParseField("format");
public static final ParseField GAP_POLICY = new ParseField("gap_policy");
@Override
public String type() {
return DerivativeReducer.TYPE.name();

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModel;
@ -37,12 +38,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
public class MovAvgParser implements Reducer.Parser {
public static final ParseField FORMAT = new ParseField("format");
public static final ParseField GAP_POLICY = new ParseField("gap_policy");
public static final ParseField MODEL = new ParseField("model");
public static final ParseField WINDOW = new ParseField("window");
public static final ParseField SETTINGS = new ParseField("settings");

View File

@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -244,6 +245,66 @@ public class MaxBucketTests extends ElasticsearchIntegrationTest {
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
List<String> maxKeys = new ArrayList<>();
double maxValue = Double.NEGATIVE_INFINITY;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
if (bucket.getDocCount() != 0) {
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
if (sum.value() > maxValue) {
maxValue = sum.value();
maxKeys = new ArrayList<>();
maxKeys.add(bucket.getKeyAsString());
} else if (sum.value() == maxValue) {
maxKeys.add(bucket.getKeyAsString());
}
}
}
InternalBucketMetricValue maxBucketValue = termsBucket.getAggregations().get("max_bucket");
assertThat(maxBucketValue, notNullValue());
assertThat(maxBucketValue.getName(), equalTo("max_bucket"));
assertThat(maxBucketValue.value(), equalTo(maxValue));
assertThat(maxBucketValue.keys(), equalTo(maxKeys.toArray(new String[maxKeys.size()])));
}
}
@Test
public void testMetric_asSubAggWithInsertZeros() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0)
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.subAggregation(maxBucket("max_bucket").setBucketsPaths("histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS)))
.execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());