Aggregations: Adding Average Bucket Aggregation
Also includes changes to the other bucket metric aggregations to share code Closes #11006
This commit is contained in:
parent
8f163ad4b0
commit
72d99773dc
|
@ -130,7 +130,7 @@ count of each bucket, instead of a specific metric:
|
|||
--------------------------------------------------
|
||||
<1> By using `_count` instead of a metric name, we can calculate the moving average of document counts in the histogram
|
||||
|
||||
|
||||
[[gap-policy]]
|
||||
[float]
|
||||
=== Dealing with gaps in the data
|
||||
|
||||
|
@ -144,7 +144,7 @@ Where there is no data available in a bucket for a given metric it presents a pr
|
|||
the current bucket and the next bucket. In the derivative reducer aggregation has a `gap policy` parameter to define what the behavior
|
||||
should be when a gap in the data is found. There are currently two options for controlling the gap policy:
|
||||
|
||||
_ignore_::
|
||||
_skip_::
|
||||
This option will not produce a derivative value for any buckets where the value in the current or previous bucket is
|
||||
missing
|
||||
|
||||
|
@ -154,6 +154,7 @@ _insert_zeros_::
|
|||
|
||||
|
||||
|
||||
include::reducer/avg-bucket-aggregation.asciidoc[]
|
||||
include::reducer/derivative-aggregation.asciidoc[]
|
||||
include::reducer/max-bucket-aggregation.asciidoc[]
|
||||
include::reducer/min-bucket-aggregation.asciidoc[]
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
[[search-aggregations-reducer-avg-bucket-aggregation]]
|
||||
=== Avg Bucket Aggregation
|
||||
|
||||
A sibling reducer aggregation which calculates the (mean) average value of a specified metric in a sibling aggregation.
|
||||
The specified metric must be numeric and the sibling aggregation must be a multi-bucket aggregation.
|
||||
|
||||
==== Syntax
|
||||
|
||||
An `avg_bucket` aggregation looks like this in isolation:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"avg_bucket": {
|
||||
"buckets_path": "the_sum"
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
.`avg_bucket` Parameters
|
||||
|===
|
||||
|Parameter Name |Description |Required |Default Value
|
||||
|`buckets_path` |The path to the buckets we wish to find the average for (see <<bucket-path-syntax>> for more
|
||||
details) |Required |
|
||||
|`gap_policy` |The policy to apply when gaps are found in the data (see <<gap-policy>> for more
|
||||
details)|Optional, defaults to `skip` ||
|
||||
|`format` |format to apply to the output value of this aggregation |Optional, defaults to `null` |
|
||||
|===
|
||||
|
||||
The following snippet calculates the average of the total monthly `sales`:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"aggs" : {
|
||||
"sales_per_month" : {
|
||||
"date_histogram" : {
|
||||
"field" : "date",
|
||||
"interval" : "month"
|
||||
},
|
||||
"aggs": {
|
||||
"sales": {
|
||||
"sum": {
|
||||
"field": "price"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"avg_monthly_sales": {
|
||||
"avg_bucket": {
|
||||
"buckets_paths": "sales_per_month>sales" <1>
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
<1> `bucket_paths` instructs this avg_bucket aggregation that we want the (mean) average value of the `sales` aggregation in the
|
||||
`sales_per_month` date histogram.
|
||||
|
||||
And the following may be the response:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"aggregations": {
|
||||
"sales_per_month": {
|
||||
"buckets": [
|
||||
{
|
||||
"key_as_string": "2015/01/01 00:00:00",
|
||||
"key": 1420070400000,
|
||||
"doc_count": 3,
|
||||
"sales": {
|
||||
"value": 550
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_as_string": "2015/02/01 00:00:00",
|
||||
"key": 1422748800000,
|
||||
"doc_count": 2,
|
||||
"sales": {
|
||||
"value": 60
|
||||
}
|
||||
},
|
||||
{
|
||||
"key_as_string": "2015/03/01 00:00:00",
|
||||
"key": 1425168000000,
|
||||
"doc_count": 2,
|
||||
"sales": {
|
||||
"value": 375
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"avg_monthly_sales": {
|
||||
"value": 328.33333333333333
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
|
@ -21,7 +21,11 @@ A `derivative` aggregation looks like this in isolation:
|
|||
.`derivative` Parameters
|
||||
|===
|
||||
|Parameter Name |Description |Required |Default Value
|
||||
|`buckets_path` |Path to the metric of interest (see <<bucket-path-syntax, `buckets_path` Syntax>> for more details |Required |
|
||||
|`buckets_path` |The path to the buckets we wish to find the derivative for (see <<bucket-path-syntax>> for more
|
||||
details) |Required |
|
||||
|`gap_policy` |The policy to apply when gaps are found in the data (see <<gap-policy>> for more
|
||||
details)|Optional, defaults to `skip` |
|
||||
|`format` |format to apply to the output value of this aggregation |Optional, defaults to `null` |
|
||||
|===
|
||||
|
||||
|
||||
|
|
|
@ -22,7 +22,10 @@ A `max_bucket` aggregation looks like this in isolation:
|
|||
|===
|
||||
|Parameter Name |Description |Required |Default Value
|
||||
|`buckets_path` |The path to the buckets we wish to find the maximum for (see <<bucket-path-syntax>> for more
|
||||
details |Required |
|
||||
details) |Required |
|
||||
|`gap_policy` |The policy to apply when gaps are found in the data (see <<gap-policy>> for more
|
||||
details)|Optional, defaults to `skip` |
|
||||
|`format` |format to apply to the output value of this aggregation |Optional, defaults to `null` |
|
||||
|===
|
||||
|
||||
The following snippet calculates the maximum of the total monthly `sales`:
|
||||
|
|
|
@ -21,7 +21,11 @@ A `max_bucket` aggregation looks like this in isolation:
|
|||
.`min_bucket` Parameters
|
||||
|===
|
||||
|Parameter Name |Description |Required |Default Value
|
||||
|`buckets_path` |Path to the metric of interest (see <<bucket-path-syntax, `buckets_path` Syntax>> for more details |Required |
|
||||
|`buckets_path` |The path to the buckets we wish to find the minimum for (see <<bucket-path-syntax>> for more
|
||||
details) |Required |
|
||||
|`gap_policy` |The policy to apply when gaps are found in the data (see <<gap-policy>> for more
|
||||
details)|Optional, defaults to `skip` |
|
||||
|`format` |format to apply to the output value of this aggregation |Optional, defaults to `null` |
|
||||
|===
|
||||
|
||||
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.elasticsearch.search.aggregations.metrics.sum.SumParser;
|
|||
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser;
|
||||
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser;
|
||||
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.avg.AvgBucketParser;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.max.MaxBucketParser;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.min.MinBucketParser;
|
||||
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser;
|
||||
|
@ -109,6 +110,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{
|
|||
reducerParsers.add(DerivativeParser.class);
|
||||
reducerParsers.add(MaxBucketParser.class);
|
||||
reducerParsers.add(MinBucketParser.class);
|
||||
reducerParsers.add(AvgBucketParser.class);
|
||||
reducerParsers.add(MovAvgParser.class);
|
||||
}
|
||||
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits;
|
|||
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
|
||||
import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.avg.AvgBucketReducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.max.MaxBucketReducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.min.MinBucketReducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer;
|
||||
|
@ -122,6 +123,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM
|
|||
InternalBucketMetricValue.registerStreams();
|
||||
MaxBucketReducer.registerStreams();
|
||||
MinBucketReducer.registerStreams();
|
||||
AvgBucketReducer.registerStreams();
|
||||
MovAvgReducer.registerStreams();
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.reducers;
|
||||
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.avg.AvgBucketBuilder;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.max.MaxBucketBuilder;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.min.MinBucketBuilder;
|
||||
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeBuilder;
|
||||
|
@ -41,6 +42,10 @@ public final class ReducerBuilders {
|
|||
return new MinBucketBuilder(name);
|
||||
}
|
||||
|
||||
public static final AvgBucketBuilder avgBucket(String name) {
|
||||
return new AvgBucketBuilder(name);
|
||||
}
|
||||
|
||||
public static final MovAvgBuilder movingAvg(String name) {
|
||||
return new MovAvgBuilder(name);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.reducers.bucketmetrics;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.min.MinBucketParser;
|
||||
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A builder for building requests for a {@link BucketMetricsReducer}
|
||||
*/
|
||||
public abstract class BucketMetricsBuilder<B extends BucketMetricsBuilder<B>> extends ReducerBuilder<B> {
|
||||
|
||||
private String format;
|
||||
private GapPolicy gapPolicy;
|
||||
|
||||
public BucketMetricsBuilder(String name, String type) {
|
||||
super(name, type);
|
||||
}
|
||||
|
||||
public B format(String format) {
|
||||
this.format = format;
|
||||
return (B) this;
|
||||
}
|
||||
|
||||
public B gapPolicy(GapPolicy gapPolicy) {
|
||||
this.gapPolicy = gapPolicy;
|
||||
return (B) this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
if (format != null) {
|
||||
builder.field(MinBucketParser.FORMAT.getPreferredName(), format);
|
||||
}
|
||||
if (gapPolicy != null) {
|
||||
builder.field(DerivativeParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
|
||||
}
|
||||
doInternalXContent(builder, params);
|
||||
return builder;
|
||||
}
|
||||
|
||||
protected void doInternalXContent(XContentBuilder builder, Params params) {
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.reducers.bucketmetrics;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentParser.Token;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A parser for parsing requests for a {@link BucketMetricsReducer}
|
||||
*/
|
||||
public abstract class BucketMetricsParser implements Reducer.Parser {
|
||||
|
||||
public static final ParseField FORMAT = new ParseField("format");
|
||||
|
||||
public BucketMetricsParser() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ReducerFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
String[] bucketsPaths = null;
|
||||
String format = null;
|
||||
GapPolicy gapPolicy = GapPolicy.SKIP;
|
||||
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (doParse(reducerName, currentFieldName, token, parser, context)) {
|
||||
// Do nothing as subclass has stored the state for this token
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if (FORMAT.match(currentFieldName)) {
|
||||
format = parser.text();
|
||||
} else if (BUCKETS_PATH.match(currentFieldName)) {
|
||||
bucketsPaths = new String[] { parser.text() };
|
||||
} else if (GAP_POLICY.match(currentFieldName)) {
|
||||
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_ARRAY) {
|
||||
if (BUCKETS_PATH.match(currentFieldName)) {
|
||||
List<String> paths = new ArrayList<>();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
|
||||
String path = parser.text();
|
||||
paths.add(path);
|
||||
}
|
||||
bucketsPaths = paths.toArray(new String[paths.size()]);
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].",
|
||||
parser.getTokenLocation());
|
||||
}
|
||||
}
|
||||
|
||||
if (bucketsPaths == null) {
|
||||
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
|
||||
+ "] for derivative aggregation [" + reducerName + "]", parser.getTokenLocation());
|
||||
}
|
||||
|
||||
ValueFormatter formatter = null;
|
||||
if (format != null) {
|
||||
formatter = ValueFormat.Patternable.Number.format(format).formatter();
|
||||
}
|
||||
|
||||
return buildFactory(reducerName, bucketsPaths, gapPolicy, formatter);
|
||||
}
|
||||
|
||||
protected abstract ReducerFactory buildFactory(String reducerName, String[] bucketsPaths, GapPolicy gapPolicy,
|
||||
@Nullable ValueFormatter formatter);
|
||||
|
||||
protected boolean doParse(String reducerName, String currentFieldName, Token token, XContentParser parser, SearchContext context) {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.reducers.bucketmetrics;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.SiblingReducer;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A class of sibling reducers which calculate metrics across the buckets of a
|
||||
* sibling aggregation
|
||||
*/
|
||||
public abstract class BucketMetricsReducer extends SiblingReducer {
|
||||
|
||||
protected ValueFormatter formatter;
|
||||
protected GapPolicy gapPolicy;
|
||||
|
||||
public BucketMetricsReducer() {
|
||||
super();
|
||||
}
|
||||
|
||||
protected BucketMetricsReducer(String name, String[] bucketsPaths, GapPolicy gapPolicy, @Nullable ValueFormatter formatter,
|
||||
Map<String, Object> metaData) {
|
||||
super(name, bucketsPaths, metaData);
|
||||
this.gapPolicy = gapPolicy;
|
||||
this.formatter = formatter;
|
||||
}
|
||||
|
||||
public final InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) {
|
||||
preCollection();
|
||||
List<String> bucketsPath = AggregationPath.parse(bucketsPaths()[0]).getPathElementsAsStringList();
|
||||
for (Aggregation aggregation : aggregations) {
|
||||
if (aggregation.getName().equals(bucketsPath.get(0))) {
|
||||
bucketsPath = bucketsPath.subList(1, bucketsPath.size());
|
||||
InternalMultiBucketAggregation multiBucketsAgg = (InternalMultiBucketAggregation) aggregation;
|
||||
List<? extends Bucket> buckets = multiBucketsAgg.getBuckets();
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
Bucket bucket = buckets.get(i);
|
||||
Double bucketValue = BucketHelpers.resolveBucketValue(multiBucketsAgg, bucket, bucketsPath, gapPolicy);
|
||||
if (bucketValue != null && !Double.isNaN(bucketValue)) {
|
||||
collectBucketValue(bucket.getKeyAsString(), bucketValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return buildAggregation(Collections.EMPTY_LIST, metaData());
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before initial collection and between successive collection runs.
|
||||
* A chance to initialize or re-initialize state
|
||||
*/
|
||||
protected void preCollection() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after a collection run is finished to build the aggregation for
|
||||
* the collected state.
|
||||
*
|
||||
* @param reducers
|
||||
* the reducers to add to the resulting aggregation
|
||||
* @param metadata
|
||||
* the metadata to add to the resulting aggregation
|
||||
* @return
|
||||
*/
|
||||
protected abstract InternalAggregation buildAggregation(List<Reducer> reducers, Map<String, Object> metadata);
|
||||
|
||||
/**
|
||||
* Called for each bucket with a value so the state can be modified based on
|
||||
* the key and metric value for this bucket
|
||||
*
|
||||
* @param bucketKey
|
||||
* the key for this bucket as a String
|
||||
* @param bucketValue
|
||||
* the value of the metric specified in <code>bucketsPath</code>
|
||||
* for this bucket
|
||||
*/
|
||||
protected abstract void collectBucketValue(String bucketKey, Double bucketValue);
|
||||
|
||||
@Override
|
||||
public void doReadFrom(StreamInput in) throws IOException {
|
||||
formatter = ValueFormatterStreams.readOptional(in);
|
||||
gapPolicy = GapPolicy.readFrom(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doWriteTo(StreamOutput out) throws IOException {
|
||||
ValueFormatterStreams.writeOptional(formatter, out);
|
||||
gapPolicy.writeTo(out);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.reducers.bucketmetrics.avg;
|
||||
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.BucketMetricsBuilder;
|
||||
|
||||
public class AvgBucketBuilder extends BucketMetricsBuilder<AvgBucketBuilder> {
|
||||
|
||||
public AvgBucketBuilder(String name) {
|
||||
super(name, AvgBucketReducer.TYPE.name());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.reducers.bucketmetrics.avg;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.BucketMetricsParser;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
|
||||
public class AvgBucketParser extends BucketMetricsParser {
|
||||
@Override
|
||||
public String type() {
|
||||
return AvgBucketReducer.TYPE.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReducerFactory buildFactory(String reducerName, String[] bucketsPaths, GapPolicy gapPolicy, @Nullable ValueFormatter formatter) {
|
||||
return new AvgBucketReducer.Factory(reducerName, bucketsPaths, gapPolicy, formatter);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.reducers.bucketmetrics.avg;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue;
|
||||
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.BucketMetricsReducer;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class AvgBucketReducer extends BucketMetricsReducer {
|
||||
|
||||
public final static Type TYPE = new Type("avg_bucket");
|
||||
|
||||
public final static ReducerStreams.Stream STREAM = new ReducerStreams.Stream() {
|
||||
@Override
|
||||
public AvgBucketReducer readResult(StreamInput in) throws IOException {
|
||||
AvgBucketReducer result = new AvgBucketReducer();
|
||||
result.readFrom(in);
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
public static void registerStreams() {
|
||||
ReducerStreams.registerStream(STREAM, TYPE.stream());
|
||||
}
|
||||
|
||||
private int count = 0;
|
||||
private double sum = 0;
|
||||
|
||||
private AvgBucketReducer() {
|
||||
}
|
||||
|
||||
protected AvgBucketReducer(String name, String[] bucketsPaths, GapPolicy gapPolicy, @Nullable ValueFormatter formatter,
|
||||
Map<String, Object> metaData) {
|
||||
super(name, bucketsPaths, gapPolicy, formatter, metaData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void preCollection() {
|
||||
count = 0;
|
||||
sum = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void collectBucketValue(String bucketKey, Double bucketValue) {
|
||||
count++;
|
||||
sum += bucketValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalAggregation buildAggregation(List<Reducer> reducers, Map<String, Object> metadata) {
|
||||
double avgValue = count == 0 ? Double.NaN : (sum / count);
|
||||
return new InternalSimpleValue(name(), avgValue, formatter, reducers, metadata);
|
||||
}
|
||||
|
||||
public static class Factory extends ReducerFactory {
|
||||
|
||||
private final ValueFormatter formatter;
|
||||
private final GapPolicy gapPolicy;
|
||||
|
||||
public Factory(String name, String[] bucketsPaths, GapPolicy gapPolicy, @Nullable ValueFormatter formatter) {
|
||||
super(name, TYPE.name(), bucketsPaths);
|
||||
this.gapPolicy = gapPolicy;
|
||||
this.formatter = formatter;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Reducer createInternal(Map<String, Object> metaData) throws IOException {
|
||||
return new AvgBucketReducer(name, bucketsPaths, gapPolicy, formatter, metaData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, List<ReducerFactory> reducerFactories) {
|
||||
if (bucketsPaths.length != 1) {
|
||||
throw new IllegalStateException(Reducer.Parser.BUCKETS_PATH.getPreferredName()
|
||||
+ " must contain a single entry for reducer [" + name + "]");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -19,41 +19,12 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.reducers.bucketmetrics.max;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
|
||||
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.BucketMetricsBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class MaxBucketBuilder extends ReducerBuilder<MaxBucketBuilder> {
|
||||
|
||||
private String format;
|
||||
private GapPolicy gapPolicy;
|
||||
public class MaxBucketBuilder extends BucketMetricsBuilder<MaxBucketBuilder> {
|
||||
|
||||
public MaxBucketBuilder(String name) {
|
||||
super(name, MaxBucketReducer.TYPE.name());
|
||||
}
|
||||
|
||||
public MaxBucketBuilder format(String format) {
|
||||
this.format = format;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MaxBucketBuilder gapPolicy(GapPolicy gapPolicy) {
|
||||
this.gapPolicy = gapPolicy;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
if (format != null) {
|
||||
builder.field(MaxBucketParser.FORMAT.getPreferredName(), format);
|
||||
}
|
||||
if (gapPolicy != null) {
|
||||
builder.field(DerivativeParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,22 +19,12 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.reducers.bucketmetrics.max;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.BucketMetricsParser;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class MaxBucketParser implements Reducer.Parser {
|
||||
public static final ParseField FORMAT = new ParseField("format");
|
||||
public class MaxBucketParser extends BucketMetricsParser {
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
|
@ -42,55 +32,7 @@ public class MaxBucketParser implements Reducer.Parser {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ReducerFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
String[] bucketsPaths = null;
|
||||
String format = null;
|
||||
GapPolicy gapPolicy = GapPolicy.SKIP;
|
||||
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if (FORMAT.match(currentFieldName)) {
|
||||
format = parser.text();
|
||||
} else if (BUCKETS_PATH.match(currentFieldName)) {
|
||||
bucketsPaths = new String[] { parser.text() };
|
||||
} else if (GAP_POLICY.match(currentFieldName)) {
|
||||
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_ARRAY) {
|
||||
if (BUCKETS_PATH.match(currentFieldName)) {
|
||||
List<String> paths = new ArrayList<>();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
|
||||
String path = parser.text();
|
||||
paths.add(path);
|
||||
}
|
||||
bucketsPaths = paths.toArray(new String[paths.size()]);
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].",
|
||||
parser.getTokenLocation());
|
||||
}
|
||||
}
|
||||
|
||||
if (bucketsPaths == null) {
|
||||
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
|
||||
+ "] for derivative aggregation [" + reducerName + "]", parser.getTokenLocation());
|
||||
}
|
||||
|
||||
ValueFormatter formatter = null;
|
||||
if (format != null) {
|
||||
formatter = ValueFormat.Patternable.Number.format(format).formatter();
|
||||
}
|
||||
|
||||
protected ReducerFactory buildFactory(String reducerName, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter) {
|
||||
return new MaxBucketReducer.Factory(reducerName, bucketsPaths, gapPolicy, formatter);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,25 +21,16 @@ package org.elasticsearch.search.aggregations.reducers.bucketmetrics.max;
|
|||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue;
|
||||
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
|
||||
import org.elasticsearch.search.aggregations.reducers.SiblingReducer;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.BucketMetricsReducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -47,7 +38,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class MaxBucketReducer extends SiblingReducer {
|
||||
public class MaxBucketReducer extends BucketMetricsReducer {
|
||||
|
||||
public final static Type TYPE = new Type("max_bucket");
|
||||
|
||||
|
@ -60,21 +51,19 @@ public class MaxBucketReducer extends SiblingReducer {
|
|||
}
|
||||
};
|
||||
|
||||
private ValueFormatter formatter;
|
||||
private GapPolicy gapPolicy;
|
||||
|
||||
public static void registerStreams() {
|
||||
ReducerStreams.registerStream(STREAM, TYPE.stream());
|
||||
}
|
||||
|
||||
private List<String> maxBucketKeys;
|
||||
private double maxValue;
|
||||
|
||||
private MaxBucketReducer() {
|
||||
}
|
||||
|
||||
protected MaxBucketReducer(String name, String[] bucketsPaths, GapPolicy gapPolicy, @Nullable ValueFormatter formatter,
|
||||
Map<String, Object> metaData) {
|
||||
super(name, bucketsPaths, metaData);
|
||||
this.gapPolicy = gapPolicy;
|
||||
this.formatter = formatter;
|
||||
super(name, bucketsPaths, gapPolicy, formatter, metaData);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -82,46 +71,29 @@ public class MaxBucketReducer extends SiblingReducer {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
public InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) {
|
||||
List<String> maxBucketKeys = new ArrayList<>();
|
||||
double maxValue = Double.NEGATIVE_INFINITY;
|
||||
List<String> bucketsPath = AggregationPath.parse(bucketsPaths()[0]).getPathElementsAsStringList();
|
||||
for (Aggregation aggregation : aggregations) {
|
||||
if (aggregation.getName().equals(bucketsPath.get(0))) {
|
||||
bucketsPath = bucketsPath.subList(1, bucketsPath.size());
|
||||
InternalMultiBucketAggregation multiBucketsAgg = (InternalMultiBucketAggregation) aggregation;
|
||||
List<? extends Bucket> buckets = multiBucketsAgg.getBuckets();
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
Bucket bucket = buckets.get(i);
|
||||
Double bucketValue = BucketHelpers.resolveBucketValue(multiBucketsAgg, bucket, bucketsPath, gapPolicy);
|
||||
if (bucketValue != null) {
|
||||
if (bucketValue > maxValue) {
|
||||
maxBucketKeys.clear();
|
||||
maxBucketKeys.add(bucket.getKeyAsString());
|
||||
maxValue = bucketValue;
|
||||
} else if (bucketValue.equals(maxValue)) {
|
||||
maxBucketKeys.add(bucket.getKeyAsString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@Override
|
||||
protected void preCollection() {
|
||||
maxBucketKeys = new ArrayList<>();
|
||||
maxValue = Double.NEGATIVE_INFINITY;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void collectBucketValue(String bucketKey, Double bucketValue) {
|
||||
if (bucketValue > maxValue) {
|
||||
maxBucketKeys.clear();
|
||||
maxBucketKeys.add(bucketKey);
|
||||
maxValue = bucketValue;
|
||||
} else if (bucketValue.equals(maxValue)) {
|
||||
maxBucketKeys.add(bucketKey);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalAggregation buildAggregation(List<Reducer> reducers, Map<String, Object> metadata) {
|
||||
String[] keys = maxBucketKeys.toArray(new String[maxBucketKeys.size()]);
|
||||
return new InternalBucketMetricValue(name(), keys, maxValue, formatter, Collections.EMPTY_LIST, metaData());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doReadFrom(StreamInput in) throws IOException {
|
||||
formatter = ValueFormatterStreams.readOptional(in);
|
||||
gapPolicy = GapPolicy.readFrom(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doWriteTo(StreamOutput out) throws IOException {
|
||||
ValueFormatterStreams.writeOptional(formatter, out);
|
||||
gapPolicy.writeTo(out);
|
||||
}
|
||||
|
||||
public static class Factory extends ReducerFactory {
|
||||
|
||||
private final ValueFormatter formatter;
|
||||
|
|
|
@ -19,41 +19,13 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.reducers.bucketmetrics.min;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
|
||||
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.BucketMetricsBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class MinBucketBuilder extends ReducerBuilder<MinBucketBuilder> {
|
||||
|
||||
private String format;
|
||||
private GapPolicy gapPolicy;
|
||||
public class MinBucketBuilder extends BucketMetricsBuilder<MinBucketBuilder> {
|
||||
|
||||
public MinBucketBuilder(String name) {
|
||||
super(name, MinBucketReducer.TYPE.name());
|
||||
}
|
||||
|
||||
public MinBucketBuilder format(String format) {
|
||||
this.format = format;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MinBucketBuilder gapPolicy(GapPolicy gapPolicy) {
|
||||
this.gapPolicy = gapPolicy;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
if (format != null) {
|
||||
builder.field(MinBucketParser.FORMAT.getPreferredName(), format);
|
||||
}
|
||||
if (gapPolicy != null) {
|
||||
builder.field(DerivativeParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,79 +19,20 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.reducers.bucketmetrics.min;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.BucketMetricsParser;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class MinBucketParser implements Reducer.Parser {
|
||||
public static final ParseField FORMAT = new ParseField("format");
|
||||
public class MinBucketParser extends BucketMetricsParser {
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return MinBucketReducer.TYPE.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReducerFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
String[] bucketsPaths = null;
|
||||
String format = null;
|
||||
GapPolicy gapPolicy = GapPolicy.SKIP;
|
||||
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if (FORMAT.match(currentFieldName)) {
|
||||
format = parser.text();
|
||||
} else if (BUCKETS_PATH.match(currentFieldName)) {
|
||||
bucketsPaths = new String[] { parser.text() };
|
||||
} else if (GAP_POLICY.match(currentFieldName)) {
|
||||
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_ARRAY) {
|
||||
if (BUCKETS_PATH.match(currentFieldName)) {
|
||||
List<String> paths = new ArrayList<>();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
|
||||
String path = parser.text();
|
||||
paths.add(path);
|
||||
}
|
||||
bucketsPaths = paths.toArray(new String[paths.size()]);
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].",
|
||||
parser.getTokenLocation());
|
||||
}
|
||||
}
|
||||
|
||||
if (bucketsPaths == null) {
|
||||
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
|
||||
+ "] for derivative aggregation [" + reducerName + "]", parser.getTokenLocation());
|
||||
}
|
||||
|
||||
ValueFormatter formatter = null;
|
||||
if (format != null) {
|
||||
formatter = ValueFormat.Patternable.Number.format(format).formatter();
|
||||
}
|
||||
|
||||
protected ReducerFactory buildFactory(String reducerName, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter) {
|
||||
return new MinBucketReducer.Factory(reducerName, bucketsPaths, gapPolicy, formatter);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -21,25 +21,16 @@ package org.elasticsearch.search.aggregations.reducers.bucketmetrics.min;
|
|||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.reducers.Reducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
|
||||
import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
|
||||
import org.elasticsearch.search.aggregations.reducers.SiblingReducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.BucketMetricsReducer;
|
||||
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -47,7 +38,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class MinBucketReducer extends SiblingReducer {
|
||||
public class MinBucketReducer extends BucketMetricsReducer {
|
||||
|
||||
public final static Type TYPE = new Type("min_bucket");
|
||||
|
||||
|
@ -60,21 +51,19 @@ public class MinBucketReducer extends SiblingReducer {
|
|||
}
|
||||
};
|
||||
|
||||
private ValueFormatter formatter;
|
||||
private GapPolicy gapPolicy;
|
||||
|
||||
public static void registerStreams() {
|
||||
ReducerStreams.registerStream(STREAM, TYPE.stream());
|
||||
}
|
||||
|
||||
private List<String> minBucketKeys;
|
||||
private double minValue;
|
||||
|
||||
private MinBucketReducer() {
|
||||
}
|
||||
|
||||
protected MinBucketReducer(String name, String[] bucketsPaths, GapPolicy gapPolicy, @Nullable ValueFormatter formatter,
|
||||
Map<String, Object> metaData) {
|
||||
super(name, bucketsPaths, metaData);
|
||||
this.gapPolicy = gapPolicy;
|
||||
this.formatter = formatter;
|
||||
super(name, bucketsPaths, gapPolicy, formatter, metaData);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -82,45 +71,27 @@ public class MinBucketReducer extends SiblingReducer {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
public InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) {
|
||||
List<String> minBucketKeys = new ArrayList<>();
|
||||
double minValue = Double.POSITIVE_INFINITY;
|
||||
List<String> bucketsPath = AggregationPath.parse(bucketsPaths()[0]).getPathElementsAsStringList();
|
||||
for (Aggregation aggregation : aggregations) {
|
||||
if (aggregation.getName().equals(bucketsPath.get(0))) {
|
||||
bucketsPath = bucketsPath.subList(1, bucketsPath.size());
|
||||
InternalMultiBucketAggregation multiBucketsAgg = (InternalMultiBucketAggregation) aggregation;
|
||||
List<? extends Bucket> buckets = multiBucketsAgg.getBuckets();
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
Bucket bucket = buckets.get(i);
|
||||
Double bucketValue = BucketHelpers.resolveBucketValue(multiBucketsAgg, bucket, bucketsPath, gapPolicy);
|
||||
if (bucketValue != null) {
|
||||
if (bucketValue < minValue) {
|
||||
minBucketKeys.clear();
|
||||
minBucketKeys.add(bucket.getKeyAsString());
|
||||
minValue = bucketValue;
|
||||
} else if (bucketValue.equals(minValue)) {
|
||||
minBucketKeys.add(bucket.getKeyAsString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@Override
|
||||
protected void preCollection() {
|
||||
minBucketKeys = new ArrayList<>();
|
||||
minValue = Double.POSITIVE_INFINITY;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void collectBucketValue(String bucketKey, Double bucketValue) {
|
||||
if (bucketValue < minValue) {
|
||||
minBucketKeys.clear();
|
||||
minBucketKeys.add(bucketKey);
|
||||
minValue = bucketValue;
|
||||
} else if (bucketValue.equals(minValue)) {
|
||||
minBucketKeys.add(bucketKey);
|
||||
}
|
||||
}
|
||||
|
||||
protected InternalAggregation buildAggregation(java.util.List<Reducer> reducers, java.util.Map<String, Object> metadata) {
|
||||
String[] keys = minBucketKeys.toArray(new String[minBucketKeys.size()]);
|
||||
return new InternalBucketMetricValue(name(), keys, minValue, formatter, Collections.EMPTY_LIST, metaData());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doReadFrom(StreamInput in) throws IOException {
|
||||
formatter = ValueFormatterStreams.readOptional(in);
|
||||
gapPolicy = GapPolicy.readFrom(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doWriteTo(StreamOutput out) throws IOException {
|
||||
ValueFormatterStreams.writeOptional(formatter, out);
|
||||
gapPolicy.writeTo(out);
|
||||
}
|
||||
};
|
||||
|
||||
public static class Factory extends ReducerFactory {
|
||||
|
||||
|
|
|
@ -0,0 +1,399 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.reducers;
|
||||
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
|
||||
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
|
||||
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
|
||||
import static org.elasticsearch.search.aggregations.reducers.ReducerBuilders.avgBucket;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.core.IsNull.notNullValue;
|
||||
|
||||
@ElasticsearchIntegrationTest.SuiteScopeTest
|
||||
public class AvgBucketTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
private static final String SINGLE_VALUED_FIELD_NAME = "l_value";
|
||||
|
||||
static int numDocs;
|
||||
static int interval;
|
||||
static int minRandomValue;
|
||||
static int maxRandomValue;
|
||||
static int numValueBuckets;
|
||||
static long[] valueCounts;
|
||||
|
||||
@Override
|
||||
public void setupSuiteScopeCluster() throws Exception {
|
||||
createIndex("idx");
|
||||
createIndex("idx_unmapped");
|
||||
|
||||
numDocs = randomIntBetween(6, 20);
|
||||
interval = randomIntBetween(2, 5);
|
||||
|
||||
minRandomValue = 0;
|
||||
maxRandomValue = 20;
|
||||
|
||||
numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1;
|
||||
valueCounts = new long[numValueBuckets];
|
||||
|
||||
List<IndexRequestBuilder> builders = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
int fieldValue = randomIntBetween(minRandomValue, maxRandomValue);
|
||||
builders.add(client().prepareIndex("idx", "type").setSource(
|
||||
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval))
|
||||
.endObject()));
|
||||
final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1);
|
||||
valueCounts[bucket]++;
|
||||
}
|
||||
|
||||
assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer"));
|
||||
for (int i = 0; i < 2; i++) {
|
||||
builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource(
|
||||
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()));
|
||||
}
|
||||
indexRandom(true, builders);
|
||||
ensureSearchable();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDocCount_topLevel() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
|
||||
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
|
||||
.addAggregation(avgBucket("avg_bucket").setBucketsPaths("histo>_count")).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Histogram histo = response.getAggregations().get("histo");
|
||||
assertThat(histo, notNullValue());
|
||||
assertThat(histo.getName(), equalTo("histo"));
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
assertThat(buckets.size(), equalTo(numValueBuckets));
|
||||
|
||||
double sum = 0;
|
||||
int count = 0;
|
||||
for (int i = 0; i < numValueBuckets; ++i) {
|
||||
Histogram.Bucket bucket = buckets.get(i);
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval));
|
||||
assertThat(bucket.getDocCount(), equalTo(valueCounts[i]));
|
||||
count++;
|
||||
sum += bucket.getDocCount();
|
||||
}
|
||||
|
||||
double avgValue = count == 0 ? Double.NaN : (sum / count);
|
||||
InternalSimpleValue avgBucketValue = response.getAggregations().get("avg_bucket");
|
||||
assertThat(avgBucketValue, notNullValue());
|
||||
assertThat(avgBucketValue.getName(), equalTo("avg_bucket"));
|
||||
assertThat(avgBucketValue.value(), equalTo(avgValue));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDocCount_asSubAgg() throws Exception {
|
||||
SearchResponse response = client()
|
||||
.prepareSearch("idx")
|
||||
.addAggregation(
|
||||
terms("terms")
|
||||
.field("tag")
|
||||
.order(Order.term(true))
|
||||
.subAggregation(
|
||||
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
|
||||
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
|
||||
.subAggregation(avgBucket("avg_bucket").setBucketsPaths("histo>_count"))).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Terms terms = response.getAggregations().get("terms");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getName(), equalTo("terms"));
|
||||
List<Terms.Bucket> termsBuckets = terms.getBuckets();
|
||||
assertThat(termsBuckets.size(), equalTo(interval));
|
||||
|
||||
for (int i = 0; i < interval; ++i) {
|
||||
Terms.Bucket termsBucket = termsBuckets.get(i);
|
||||
assertThat(termsBucket, notNullValue());
|
||||
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
|
||||
|
||||
Histogram histo = termsBucket.getAggregations().get("histo");
|
||||
assertThat(histo, notNullValue());
|
||||
assertThat(histo.getName(), equalTo("histo"));
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
|
||||
double sum = 0;
|
||||
int count = 0;
|
||||
for (int j = 0; j < numValueBuckets; ++j) {
|
||||
Histogram.Bucket bucket = buckets.get(j);
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
|
||||
count++;
|
||||
sum += bucket.getDocCount();
|
||||
}
|
||||
|
||||
double avgValue = count == 0 ? Double.NaN : (sum / count);
|
||||
InternalSimpleValue avgBucketValue = termsBucket.getAggregations().get("avg_bucket");
|
||||
assertThat(avgBucketValue, notNullValue());
|
||||
assertThat(avgBucketValue.getName(), equalTo("avg_bucket"));
|
||||
assertThat(avgBucketValue.value(), equalTo(avgValue));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetric_topLevel() throws Exception {
|
||||
SearchResponse response = client()
|
||||
.prepareSearch("idx")
|
||||
.addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
|
||||
.addAggregation(avgBucket("avg_bucket").setBucketsPaths("terms>sum")).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Terms terms = response.getAggregations().get("terms");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getName(), equalTo("terms"));
|
||||
List<Terms.Bucket> buckets = terms.getBuckets();
|
||||
assertThat(buckets.size(), equalTo(interval));
|
||||
|
||||
double bucketSum = 0;
|
||||
int count = 0;
|
||||
for (int i = 0; i < interval; ++i) {
|
||||
Terms.Bucket bucket = buckets.get(i);
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval)));
|
||||
assertThat(bucket.getDocCount(), greaterThan(0l));
|
||||
Sum sum = bucket.getAggregations().get("sum");
|
||||
assertThat(sum, notNullValue());
|
||||
count++;
|
||||
bucketSum += sum.value();
|
||||
}
|
||||
|
||||
double avgValue = count == 0 ? Double.NaN : (bucketSum / count);
|
||||
InternalSimpleValue avgBucketValue = response.getAggregations().get("avg_bucket");
|
||||
assertThat(avgBucketValue, notNullValue());
|
||||
assertThat(avgBucketValue.getName(), equalTo("avg_bucket"));
|
||||
assertThat(avgBucketValue.value(), equalTo(avgValue));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetric_asSubAgg() throws Exception {
|
||||
SearchResponse response = client()
|
||||
.prepareSearch("idx")
|
||||
.addAggregation(
|
||||
terms("terms")
|
||||
.field("tag")
|
||||
.order(Order.term(true))
|
||||
.subAggregation(
|
||||
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
|
||||
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
|
||||
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
|
||||
.subAggregation(avgBucket("avg_bucket").setBucketsPaths("histo>sum"))).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Terms terms = response.getAggregations().get("terms");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getName(), equalTo("terms"));
|
||||
List<Terms.Bucket> termsBuckets = terms.getBuckets();
|
||||
assertThat(termsBuckets.size(), equalTo(interval));
|
||||
|
||||
for (int i = 0; i < interval; ++i) {
|
||||
Terms.Bucket termsBucket = termsBuckets.get(i);
|
||||
assertThat(termsBucket, notNullValue());
|
||||
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
|
||||
|
||||
Histogram histo = termsBucket.getAggregations().get("histo");
|
||||
assertThat(histo, notNullValue());
|
||||
assertThat(histo.getName(), equalTo("histo"));
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
|
||||
double bucketSum = 0;
|
||||
int count = 0;
|
||||
for (int j = 0; j < numValueBuckets; ++j) {
|
||||
Histogram.Bucket bucket = buckets.get(j);
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
|
||||
if (bucket.getDocCount() != 0) {
|
||||
Sum sum = bucket.getAggregations().get("sum");
|
||||
assertThat(sum, notNullValue());
|
||||
count++;
|
||||
bucketSum += sum.value();
|
||||
}
|
||||
}
|
||||
|
||||
double avgValue = count == 0 ? Double.NaN : (bucketSum / count);
|
||||
InternalSimpleValue avgBucketValue = termsBucket.getAggregations().get("avg_bucket");
|
||||
assertThat(avgBucketValue, notNullValue());
|
||||
assertThat(avgBucketValue.getName(), equalTo("avg_bucket"));
|
||||
assertThat(avgBucketValue.value(), equalTo(avgValue));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetric_asSubAggWithInsertZeros() throws Exception {
|
||||
SearchResponse response = client()
|
||||
.prepareSearch("idx")
|
||||
.addAggregation(
|
||||
terms("terms")
|
||||
.field("tag")
|
||||
.order(Order.term(true))
|
||||
.subAggregation(
|
||||
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
|
||||
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
|
||||
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
|
||||
.subAggregation(avgBucket("avg_bucket").setBucketsPaths("histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS)))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Terms terms = response.getAggregations().get("terms");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getName(), equalTo("terms"));
|
||||
List<Terms.Bucket> termsBuckets = terms.getBuckets();
|
||||
assertThat(termsBuckets.size(), equalTo(interval));
|
||||
|
||||
for (int i = 0; i < interval; ++i) {
|
||||
Terms.Bucket termsBucket = termsBuckets.get(i);
|
||||
assertThat(termsBucket, notNullValue());
|
||||
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
|
||||
|
||||
Histogram histo = termsBucket.getAggregations().get("histo");
|
||||
assertThat(histo, notNullValue());
|
||||
assertThat(histo.getName(), equalTo("histo"));
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
|
||||
double bucketSum = 0;
|
||||
int count = 0;
|
||||
for (int j = 0; j < numValueBuckets; ++j) {
|
||||
Histogram.Bucket bucket = buckets.get(j);
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
|
||||
Sum sum = bucket.getAggregations().get("sum");
|
||||
assertThat(sum, notNullValue());
|
||||
|
||||
count++;
|
||||
bucketSum += sum.value();
|
||||
}
|
||||
|
||||
double avgValue = count == 0 ? Double.NaN : (bucketSum / count);
|
||||
InternalSimpleValue avgBucketValue = termsBucket.getAggregations().get("avg_bucket");
|
||||
assertThat(avgBucketValue, notNullValue());
|
||||
assertThat(avgBucketValue.getName(), equalTo("avg_bucket"));
|
||||
assertThat(avgBucketValue.value(), equalTo(avgValue));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoBuckets() throws Exception {
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
|
||||
.addAggregation(avgBucket("avg_bucket").setBucketsPaths("terms>sum")).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Terms terms = response.getAggregations().get("terms");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getName(), equalTo("terms"));
|
||||
List<Terms.Bucket> buckets = terms.getBuckets();
|
||||
assertThat(buckets.size(), equalTo(0));
|
||||
|
||||
InternalSimpleValue avgBucketValue = response.getAggregations().get("avg_bucket");
|
||||
assertThat(avgBucketValue, notNullValue());
|
||||
assertThat(avgBucketValue.getName(), equalTo("avg_bucket"));
|
||||
assertThat(avgBucketValue.value(), equalTo(Double.NaN));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNested() throws Exception {
|
||||
SearchResponse response = client()
|
||||
.prepareSearch("idx")
|
||||
.addAggregation(
|
||||
terms("terms")
|
||||
.field("tag")
|
||||
.order(Order.term(true))
|
||||
.subAggregation(
|
||||
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
|
||||
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
|
||||
.subAggregation(avgBucket("avg_histo_bucket").setBucketsPaths("histo>_count")))
|
||||
.addAggregation(avgBucket("avg_terms_bucket").setBucketsPaths("terms>avg_histo_bucket")).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
Terms terms = response.getAggregations().get("terms");
|
||||
assertThat(terms, notNullValue());
|
||||
assertThat(terms.getName(), equalTo("terms"));
|
||||
List<Terms.Bucket> termsBuckets = terms.getBuckets();
|
||||
assertThat(termsBuckets.size(), equalTo(interval));
|
||||
|
||||
double aggTermsSum = 0;
|
||||
int aggTermsCount = 0;
|
||||
for (int i = 0; i < interval; ++i) {
|
||||
Terms.Bucket termsBucket = termsBuckets.get(i);
|
||||
assertThat(termsBucket, notNullValue());
|
||||
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
|
||||
|
||||
Histogram histo = termsBucket.getAggregations().get("histo");
|
||||
assertThat(histo, notNullValue());
|
||||
assertThat(histo.getName(), equalTo("histo"));
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
|
||||
double aggHistoSum = 0;
|
||||
int aggHistoCount = 0;
|
||||
for (int j = 0; j < numValueBuckets; ++j) {
|
||||
Histogram.Bucket bucket = buckets.get(j);
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
|
||||
|
||||
aggHistoCount++;
|
||||
aggHistoSum += bucket.getDocCount();
|
||||
}
|
||||
|
||||
double avgHistoValue = aggHistoCount == 0 ? Double.NaN : (aggHistoSum / aggHistoCount);
|
||||
InternalSimpleValue avgBucketValue = termsBucket.getAggregations().get("avg_histo_bucket");
|
||||
assertThat(avgBucketValue, notNullValue());
|
||||
assertThat(avgBucketValue.getName(), equalTo("avg_histo_bucket"));
|
||||
assertThat(avgBucketValue.value(), equalTo(avgHistoValue));
|
||||
|
||||
aggTermsCount++;
|
||||
aggTermsSum += avgHistoValue;
|
||||
}
|
||||
|
||||
double avgTermsValue = aggTermsCount == 0 ? Double.NaN : (aggTermsSum / aggTermsCount);
|
||||
InternalSimpleValue avgBucketValue = response.getAggregations().get("avg_terms_bucket");
|
||||
assertThat(avgBucketValue, notNullValue());
|
||||
assertThat(avgBucketValue.getName(), equalTo("avg_terms_bucket"));
|
||||
assertThat(avgBucketValue.value(), equalTo(avgTermsValue));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue