[7.x] Histogram field type support for ValueCount and Avg aggregations (#56099)

Backports #55933 to 7.x

Implements value_count and avg aggregations over Histogram fields as discussed in #53285

- value_count returns the sum of all counts array of the histograms
- avg computes a weighted average of the values array of the histogram by multiplying each value with its associated element in the counts array
This commit is contained in:
Christos Soulios 2020-05-04 13:23:02 +03:00 committed by GitHub
parent 0860d1dc74
commit c65f828cb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 715 additions and 49 deletions

View File

@ -126,3 +126,57 @@ POST /exams/_search?size=0
// TEST[setup:exams]
<1> Documents without a value in the `grade` field will fall into the same bucket as documents that have the value `10`.
[[search-aggregations-metrics-avg-aggregation-histogram-fields]]
==== Histogram fields
When average is computed on <<histogram,histogram fields>>, the result of the aggregation is the weighted average
of all elements in the `values` array taking into consideration the number in the same position in the `counts` array.
For example, for the following index that stores pre-aggregated histograms with latency metrics for different networks:
[source,console]
--------------------------------------------------
PUT metrics_index/_doc/1
{
"network.name" : "net-1",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5], <1>
"counts" : [3, 7, 23, 12, 6] <2>
}
}
PUT metrics_index/_doc/2
{
"network.name" : "net-2",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5], <1>
"counts" : [8, 17, 8, 7, 6] <2>
}
}
POST /metrics_index/_search?size=0
{
"aggs" : {
"avg_latency" :
{ "avg" : { "field" : "latency_histo" }
}
}
}
--------------------------------------------------
For each histogram field the `avg` aggregation adds each number in the `values` array <1> multiplied by its associated count
in the `counts` array <2>. Eventually, it will compute the average over those values for all histograms and return the following result:
[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"avg_latency" : {
"value" : 0.29690721649
}
}
}
--------------------------------------------------
// TESTRESPONSE[skip:test not setup]

View File

@ -163,10 +163,10 @@ POST /sales/_search?size=0
[[search-aggregations-metrics-sum-aggregation-histogram-fields]]
==== Histogram fields
When the sums are computed on <<histogram,histogram fields>>, the result of the aggregation is the sum of all elements in the `values`
When sum is computed on <<histogram,histogram fields>>, the result of the aggregation is the sum of all elements in the `values`
array multiplied by the number in the same position in the `counts` array.
For example, if we have the following index that stores pre-aggregated histograms with latency metrics for different networks:
For example, for the following index that stores pre-aggregated histograms with latency metrics for different networks:
[source,console]
--------------------------------------------------
@ -196,7 +196,7 @@ POST /metrics_index/_search?size=0
}
--------------------------------------------------
For each histogram field the sum aggregation will multiply each number in the `values` array <1> multiplied with its associated count
For each histogram field the `sum` aggregation will multiply each number in the `values` array <1> multiplied by its associated count
in the `counts` array <2>. Eventually, it will add all values for all histograms and return the following result:
[source,console-result]

View File

@ -6,6 +6,9 @@ These values can be extracted either from specific fields in the documents, or b
this aggregator will be used in conjunction with other single-value aggregations. For example, when computing the `avg`
one might be interested in the number of values the average is computed over.
`value_count` does not de-duplicate values, so even if a field has duplicates (or a script generates multiple
identical values for a single document), each value will be counted individually.
[source,console]
--------------------------------------------------
POST /sales/_search?size=0
@ -77,3 +80,60 @@ POST /sales/_search?size=0
}
--------------------------------------------------
// TEST[setup:sales,stored_example_script]
NOTE:: Because `value_count` is designed to work with any field it internally treats all values as simple bytes.
Due to this implementation, if `_value` script variable is used to fetch a value instead of accessing the field
directly (e.g. a "value script"), the field value will be returned as a string instead of it's native format.
[[search-aggregations-metrics-valuecount-aggregation-histogram-fields]]
==== Histogram fields
When the `value_count` aggregation is computed on <<histogram,histogram fields>>, the result of the aggregation is the sum of all numbers
in the `counts` array of the histogram.
For example, for the following index that stores pre-aggregated histograms with latency metrics for different networks:
[source,console]
--------------------------------------------------
PUT metrics_index/_doc/1
{
"network.name" : "net-1",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5],
"counts" : [3, 7, 23, 12, 6] <1>
}
}
PUT metrics_index/_doc/2
{
"network.name" : "net-2",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5],
"counts" : [8, 17, 8, 7, 6] <1>
}
}
POST /metrics_index/_search?size=0
{
"aggs" : {
"total_requests" : {
"value_count" : { "field" : "latency_histo" }
}
}
}
--------------------------------------------------
For each histogram field the `value_count` aggregation will sum all numbers in the `counts` array <1>.
Eventually, it will add all values for all histograms and return the following result:
[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"total_requests" : {
"value" : 97
}
}
}
--------------------------------------------------
// TESTRESPONSE[skip:test not setup]

View File

@ -36,6 +36,8 @@ Because the data is not indexed, you only can use `histogram` fields for the
following aggregations and queries:
* <<search-aggregations-metrics-sum-aggregation-histogram-fields,sum>> aggregation
* <<search-aggregations-metrics-valuecount-aggregation-histogram-fields,value_count>> aggregation
* <<search-aggregations-metrics-avg-aggregation-histogram-fields,avg>> aggregation
* <<search-aggregations-metrics-percentile-aggregation,percentiles>> aggregation
* <<search-aggregations-metrics-percentile-rank-aggregation,percentile ranks>> aggregation
* <<search-aggregations-metrics-boxplot-aggregation,boxplot>> aggregation

View File

@ -34,7 +34,7 @@ import java.util.Objects;
public class InternalValueCount extends InternalNumericMetricsAggregation.SingleValue implements ValueCount {
private final long value;
InternalValueCount(String name, long value, Map<String, Object> metadata) {
public InternalValueCount(String name, long value, Map<String, Object> metadata) {
super(name, metadata);
this.value = value;
}

View File

@ -52,7 +52,6 @@ import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -86,7 +85,7 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
@Override
public List<AggregationSpec> getAggregations() {
return Arrays.asList(
return org.elasticsearch.common.collect.List.of(
new AggregationSpec(
StringStatsAggregationBuilder.NAME,
StringStatsAggregationBuilder::new,
@ -143,10 +142,12 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
@Override
public List<Consumer<ValuesSourceRegistry.Builder>> getAggregationExtentions() {
return Arrays.asList(
return org.elasticsearch.common.collect.List.of(
AnalyticsAggregatorFactory::registerPercentilesAggregator,
AnalyticsAggregatorFactory::registerPercentileRanksAggregator,
AnalyticsAggregatorFactory::registerHistoBackedSumAggregator
AnalyticsAggregatorFactory::registerHistoBackedSumAggregator,
AnalyticsAggregatorFactory::registerHistoBackedValueCountAggregator,
AnalyticsAggregatorFactory::registerHistoBackedAverageAggregator
);
}
@ -160,7 +161,7 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
return org.elasticsearch.common.collect.List.of(
new NamedWriteableRegistry.Entry(TTestState.class, PairedTTestState.NAME, PairedTTestState::new),
new NamedWriteableRegistry.Entry(TTestState.class, UnpairedTTestState.NAME, UnpairedTTestState::new)
);

View File

@ -3,9 +3,9 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.aggregations.metrics;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MetricAggregatorSupplier;
import org.elasticsearch.search.aggregations.metrics.PercentileRanksAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
@ -13,8 +13,11 @@ import org.elasticsearch.search.aggregations.metrics.PercentilesAggregatorSuppli
import org.elasticsearch.search.aggregations.metrics.PercentilesConfig;
import org.elasticsearch.search.aggregations.metrics.PercentilesMethod;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregatorSupplier;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;
public class AnalyticsAggregatorFactory {
@ -65,6 +68,24 @@ public class AnalyticsAggregatorFactory {
public static void registerHistoBackedSumAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(SumAggregationBuilder.NAME,
AnalyticsValuesSourceType.HISTOGRAM,
(MetricAggregatorSupplier) HistoBackedSumAggregator::new);
(MetricAggregatorSupplier) (name, valuesSource, format, context, parent, metadata) ->
new HistoBackedSumAggregator(name, (HistogramValuesSource.Histogram) valuesSource, format, context, parent, metadata)
);
}
public static void registerHistoBackedValueCountAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(ValueCountAggregationBuilder.NAME,
AnalyticsValuesSourceType.HISTOGRAM,
(ValueCountAggregatorSupplier) (name, valuesSource, context, parent, metadata) ->
new HistoBackedValueCountAggregator(name, (HistogramValuesSource.Histogram) valuesSource, context, parent, metadata)
);
}
public static void registerHistoBackedAverageAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(AvgAggregationBuilder.NAME,
AnalyticsValuesSourceType.HISTOGRAM,
(MetricAggregatorSupplier) (name, valuesSource, format, context, parent, metadata) ->
new HistoBackedAvgAggregator(name, (HistogramValuesSource.Histogram) valuesSource, format, context, parent, metadata)
);
}
}

View File

@ -0,0 +1,124 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.aggregations.metrics;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;
import java.io.IOException;
import java.util.Map;
/**
* Average aggregator operating over histogram datatypes {@link HistogramValuesSource}
* The aggregation computes weighted average by taking counts into consideration for each value
*/
class HistoBackedAvgAggregator extends NumericMetricsAggregator.SingleValue {
private final HistogramValuesSource.Histogram valuesSource;
LongArray counts;
DoubleArray sums;
DoubleArray compensations;
DocValueFormat format;
HistoBackedAvgAggregator(String name, HistogramValuesSource.Histogram valuesSource, DocValueFormat formatter, SearchContext context,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
this.format = formatter;
if (valuesSource != null) {
final BigArrays bigArrays = context.bigArrays();
counts = bigArrays.newLongArray(1, true);
sums = bigArrays.newDoubleArray(1, true);
compensations = bigArrays.newDoubleArray(1, true);
}
}
@Override
public ScoreMode scoreMode() {
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final HistogramValues values = valuesSource.getHistogramValues(ctx);
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
counts = bigArrays.grow(counts, bucket + 1);
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);
if (values.advanceExact(doc)) {
final HistogramValue sketch = values.histogram();
// Compute the sum of double values with Kahan summation algorithm which is more accurate than naive summation
final double sum = sums.get(bucket);
final double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
while (sketch.next()) {
double d = sketch.value() * sketch.count();
kahanSummation.add(d);
counts.increment(bucket, sketch.count());
}
sums.set(bucket, kahanSummation.value());
compensations.set(bucket, kahanSummation.delta());
}
}
};
}
@Override
public double metric(long owningBucketOrd) {
if (valuesSource == null || owningBucketOrd >= sums.size()) {
return Double.NaN;
}
return sums.get(owningBucketOrd) / counts.get(owningBucketOrd);
}
@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= sums.size()) {
return buildEmptyAggregation();
}
return new InternalAvg(name, sums.get(bucket), counts.get(bucket), format, metadata());
}
@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalAvg(name, 0.0, 0L, format, metadata());
}
@Override
public void doClose() {
Releasables.close(counts, sums, compensations);
}
}

View File

@ -20,7 +20,6 @@ import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalSum;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;
@ -29,16 +28,19 @@ import java.util.Map;
/**
* Sum aggregator operating over histogram datatypes {@link HistogramValuesSource}
*
* The aggregator sums each histogram value multiplied by its count.
* Eg for a histogram of response times, this is an approximate "total time spent".
*/
class HistoBackedSumAggregator extends NumericMetricsAggregator.SingleValue {
private final ValuesSource valuesSource;
private final HistogramValuesSource.Histogram valuesSource;
private final DocValueFormat format;
private DoubleArray sums;
private DoubleArray compensations;
HistoBackedSumAggregator(String name, ValuesSource valuesSource, DocValueFormat formatter, SearchContext context,
HistoBackedSumAggregator(String name, HistogramValuesSource.Histogram valuesSource, DocValueFormat formatter, SearchContext context,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
@ -61,7 +63,7 @@ class HistoBackedSumAggregator extends NumericMetricsAggregator.SingleValue {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final HistogramValues values = ((HistogramValuesSource.Histogram) valuesSource).getHistogramValues(ctx);
final HistogramValues values = valuesSource.getHistogramValues(ctx);
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
return new LeafBucketCollectorBase(sub, values) {

View File

@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.aggregations.metrics;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.InternalValueCount;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;
import java.io.IOException;
import java.util.Map;
/**
* Value count aggregator operating over histogram datatypes {@link HistogramValuesSource}
* The aggregation counts the number of values a histogram field has within the aggregation context
* by adding the counts of the histograms.
*/
public class HistoBackedValueCountAggregator extends NumericMetricsAggregator.SingleValue {
final HistogramValuesSource.Histogram valuesSource;
/** Count per bucket */
LongArray counts;
public HistoBackedValueCountAggregator(
String name,
HistogramValuesSource.Histogram valuesSource,
SearchContext aggregationContext,
Aggregator parent,
Map<String, Object> metadata) throws IOException {
super(name, aggregationContext, parent, metadata);
this.valuesSource = valuesSource;
if (valuesSource != null) {
counts = context.bigArrays().newLongArray(1, true);
}
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final HistogramValues values = valuesSource.getHistogramValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
counts = bigArrays.grow(counts, bucket + 1);
if (values.advanceExact(doc)) {
final HistogramValue sketch = values.histogram();
while (sketch.next()) {
counts.increment(bucket, sketch.count());
}
}
}
};
}
@Override
public double metric(long owningBucketOrd) {
return (valuesSource == null || owningBucketOrd >= counts.size()) ? 0 : counts.get(owningBucketOrd);
}
@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= counts.size()) {
return buildEmptyAggregation();
}
return new InternalValueCount(name, counts.get(bucket), metadata());
}
@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalValueCount(name, 0L, metadata());
}
@Override
public void doClose() {
Releasables.close(counts);
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
package org.elasticsearch.xpack.analytics.aggregations.metrics;
import org.HdrHistogram.DoubleHistogram;
import org.HdrHistogram.DoubleHistogramIterationValue;
@ -31,6 +31,7 @@ import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;
import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
import org.hamcrest.Matchers;
import java.io.IOException;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
package org.elasticsearch.xpack.analytics.aggregations.metrics;
import org.HdrHistogram.DoubleHistogram;
import org.HdrHistogram.DoubleHistogramIterationValue;
@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;
import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
import java.io.IOException;
import java.util.Arrays;

View File

@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.aggregations.metrics;
import com.tdunning.math.stats.Centroid;
import com.tdunning.math.stats.TDigest;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.TDigestState;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;
import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import static java.util.Collections.singleton;
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
public class HistoBackedAvgAggregatorTests extends AggregatorTestCase {
private static final String FIELD_NAME = "field";
public void testNoDocs() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
// Intentionally not writing any docs
}, avg -> {
assertEquals(Double.NaN, avg.getValue(), 0d);
assertFalse(AggregationInspectionHelper.hasValue(avg));
});
}
public void testNoMatchingField() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
iw.addDocument(singleton(getDocValue("wrong_field", new double[] {3, 1.2, 10})));
iw.addDocument(singleton(getDocValue("wrong_field", new double[] {5.3, 6, 20})));
}, avg -> {
assertEquals(Double.NaN, avg.getValue(), 0d);
assertFalse(AggregationInspectionHelper.hasValue(avg));
});
}
public void testSimpleHistogram() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
iw.addDocument(singleton(getDocValue(FIELD_NAME, new double[] {3, 1.2, 10})));
iw.addDocument(singleton(getDocValue(FIELD_NAME, new double[] {5.3, 6, 6, 20})));
iw.addDocument(singleton(getDocValue(FIELD_NAME, new double[] {-10, 0.01, 1, 90})));
}, avg -> {
assertEquals(12.0463d, avg.getValue(), 0.01d);
assertTrue(AggregationInspectionHelper.hasValue(avg));
});
}
public void testQueryFiltering() throws IOException {
testCase(new TermQuery(new Term("match", "yes")), iw -> {
iw.addDocument(Arrays.asList(
new StringField("match", "yes", Field.Store.NO),
getDocValue(FIELD_NAME, new double[] {3, 1.2, 10}))
);
iw.addDocument(Arrays.asList(
new StringField("match", "yes", Field.Store.NO),
getDocValue(FIELD_NAME, new double[] {5.3, 6, 20}))
);
iw.addDocument(Arrays.asList(
new StringField("match", "no", Field.Store.NO),
getDocValue(FIELD_NAME, new double[] {3, 1.2, 10}))
);
iw.addDocument(Arrays.asList(
new StringField("match", "no", Field.Store.NO),
getDocValue(FIELD_NAME, new double[] {3, 1.2, 10}))
);
iw.addDocument(Arrays.asList(
new StringField("match", "yes", Field.Store.NO),
getDocValue(FIELD_NAME, new double[] {-10, 0.01, 1, 90}))
);
}, avg -> {
assertEquals(12.651d, avg.getValue(), 0.01d);
assertTrue(AggregationInspectionHelper.hasValue(avg));
});
}
private void testCase(Query query,
CheckedConsumer<RandomIndexWriter, IOException> indexer,
Consumer<InternalAvg> verify) throws IOException {
testCase(avg("_name").field(FIELD_NAME), query, indexer, verify, defaultFieldType(FIELD_NAME));
}
private BinaryDocValuesField getDocValue(String fieldName, double[] values) throws IOException {
TDigest histogram = new TDigestState(100.0); //default
for (double value : values) {
histogram.add(value);
}
BytesStreamOutput streamOutput = new BytesStreamOutput();
histogram.compress();
Collection<Centroid> centroids = histogram.centroids();
Iterator<Centroid> iterator = centroids.iterator();
while ( iterator.hasNext()) {
Centroid centroid = iterator.next();
streamOutput.writeVInt(centroid.count());
streamOutput.writeDouble(centroid.mean());
}
return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef());
}
@Override
protected List<SearchPlugin> getSearchPlugins() {
return org.elasticsearch.common.collect.List.of(new AnalyticsPlugin(Settings.EMPTY));
}
@Override
protected List<ValuesSourceType> getSupportedValuesSourceTypes() {
// Note: this is the same list as Core, plus Analytics
return org.elasticsearch.common.collect.List.of(
CoreValuesSourceType.NUMERIC,
CoreValuesSourceType.DATE,
CoreValuesSourceType.BOOLEAN,
AnalyticsValuesSourceType.HISTOGRAM
);
}
@Override
protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) {
return new AvgAggregationBuilder("_name").field(fieldName);
}
private MappedFieldType defaultFieldType(String fieldName) {
MappedFieldType fieldType = new HistogramFieldMapper.Builder("field").fieldType();
fieldType.setName("field");
return fieldType;
}
}

View File

@ -10,15 +10,11 @@ import com.tdunning.math.stats.TDigest;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
@ -111,27 +107,7 @@ public class HistoBackedSumAggregatorTests extends AggregatorTestCase {
private void testCase(Query query,
CheckedConsumer<RandomIndexWriter, IOException> indexer,
Consumer<InternalSum> verify) throws IOException {
testCase(query, sum("_name").field(FIELD_NAME), indexer, verify, singleton(defaultFieldType(FIELD_NAME)));
}
private void testCase(Query query,
SumAggregationBuilder aggregationBuilder,
CheckedConsumer<RandomIndexWriter, IOException> indexer,
Consumer<InternalSum> verify,
Collection<MappedFieldType> fieldTypes) throws IOException {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
indexer.accept(indexWriter);
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
final MappedFieldType[] fieldTypesArray = fieldTypes.toArray(new MappedFieldType[0]);
final InternalSum internalSum = search(indexSearcher, query, aggregationBuilder, fieldTypesArray);
verify.accept(internalSum);
}
}
testCase(sum("_name").field(FIELD_NAME), query, indexer, verify, defaultFieldType(FIELD_NAME));
}
private BinaryDocValuesField getDocValue(String fieldName, double[] values) throws IOException {
@ -153,13 +129,13 @@ public class HistoBackedSumAggregatorTests extends AggregatorTestCase {
@Override
protected List<SearchPlugin> getSearchPlugins() {
return Arrays.asList(new AnalyticsPlugin(Settings.EMPTY));
return org.elasticsearch.common.collect.List.of(new AnalyticsPlugin(Settings.EMPTY));
}
@Override
protected List<ValuesSourceType> getSupportedValuesSourceTypes() {
// Note: this is the same list as Core, plus Analytics
return Arrays.asList(
return org.elasticsearch.common.collect.List.of(
CoreValuesSourceType.NUMERIC,
CoreValuesSourceType.DATE,
CoreValuesSourceType.BOOLEAN,

View File

@ -0,0 +1,161 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.aggregations.metrics;
import com.tdunning.math.stats.Centroid;
import com.tdunning.math.stats.TDigest;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.metrics.InternalValueCount;
import org.elasticsearch.search.aggregations.metrics.TDigestState;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;
import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import static java.util.Collections.singleton;
import static org.elasticsearch.search.aggregations.AggregationBuilders.count;
public class HistoBackedValueCountAggregatorTests extends AggregatorTestCase {
private static final String FIELD_NAME = "field";
public void testNoDocs() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
// Intentionally not writing any docs
}, count -> {
assertEquals(0L, count.getValue(), 0d);
assertFalse(AggregationInspectionHelper.hasValue(count));
});
}
public void testNoMatchingField() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
iw.addDocument(singleton(getDocValue("wrong_field", new double[] {3, 1.2, 10})));
iw.addDocument(singleton(getDocValue("wrong_field", new double[] {5.3, 6, 20})));
}, count -> {
assertEquals(0L, count.getValue());
assertFalse(AggregationInspectionHelper.hasValue(count));
});
}
public void testSimpleHistogram() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
iw.addDocument(singleton(getDocValue(FIELD_NAME, new double[] {3, 1.2, 10})));
iw.addDocument(singleton(getDocValue(FIELD_NAME, new double[] {5.3, 6, 6, 20})));
iw.addDocument(singleton(getDocValue(FIELD_NAME, new double[] {-10, 0.01, 1, 90})));
}, count -> {
assertEquals(11, count.getValue());
assertTrue(AggregationInspectionHelper.hasValue(count));
});
}
public void testQueryFiltering() throws IOException {
testCase(new TermQuery(new Term("match", "yes")), iw -> {
iw.addDocument(Arrays.asList(
new StringField("match", "yes", Field.Store.NO),
getDocValue(FIELD_NAME, new double[] {3, 1.2, 10}))
);
iw.addDocument(Arrays.asList(
new StringField("match", "yes", Field.Store.NO),
getDocValue(FIELD_NAME, new double[] {5.3, 6, 20}))
);
iw.addDocument(Arrays.asList(
new StringField("match", "no", Field.Store.NO),
getDocValue(FIELD_NAME, new double[] {3, 1.2, 10}))
);
iw.addDocument(Arrays.asList(
new StringField("match", "no", Field.Store.NO),
getDocValue(FIELD_NAME, new double[] {3, 1.2, 10}))
);
iw.addDocument(Arrays.asList(
new StringField("match", "yes", Field.Store.NO),
getDocValue(FIELD_NAME, new double[] {-10, 0.01, 1, 90}))
);
}, count -> {
assertEquals(10, count.getValue());
assertTrue(AggregationInspectionHelper.hasValue(count));
});
}
private void testCase(
Query query,
CheckedConsumer<RandomIndexWriter, IOException> indexer,
Consumer<InternalValueCount> verify) throws IOException {
testCase(count("_name").field(FIELD_NAME), query, indexer, verify, defaultFieldType(FIELD_NAME));
}
private BinaryDocValuesField getDocValue(String fieldName, double[] values) throws IOException {
TDigest histogram = new TDigestState(100.0); //default
for (double value : values) {
histogram.add(value);
}
BytesStreamOutput streamOutput = new BytesStreamOutput();
histogram.compress();
Collection<Centroid> centroids = histogram.centroids();
Iterator<Centroid> iterator = centroids.iterator();
while ( iterator.hasNext()) {
Centroid centroid = iterator.next();
streamOutput.writeVInt(centroid.count());
streamOutput.writeDouble(centroid.mean());
}
return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef());
}
@Override
protected List<SearchPlugin> getSearchPlugins() {
return org.elasticsearch.common.collect.List.of(new AnalyticsPlugin(Settings.EMPTY));
}
@Override
protected List<ValuesSourceType> getSupportedValuesSourceTypes() {
// Note: this is the same list as Core, plus Analytics
return org.elasticsearch.common.collect.List.of(
CoreValuesSourceType.NUMERIC,
CoreValuesSourceType.DATE,
CoreValuesSourceType.BOOLEAN,
CoreValuesSourceType.BYTES,
CoreValuesSourceType.IP,
CoreValuesSourceType.GEOPOINT,
CoreValuesSourceType.RANGE,
AnalyticsValuesSourceType.HISTOGRAM
);
}
@Override
protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) {
return new ValueCountAggregationBuilder("_name").field(fieldName);
}
private MappedFieldType defaultFieldType(String fieldName) {
MappedFieldType fieldType = new HistogramFieldMapper.Builder("field").fieldType();
fieldType.setName("field");
return fieldType;
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
package org.elasticsearch.xpack.analytics.aggregations.metrics;
import com.tdunning.math.stats.Centroid;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
package org.elasticsearch.xpack.analytics.aggregations.metrics;
import com.tdunning.math.stats.Centroid;
import com.tdunning.math.stats.TDigest;
@ -32,6 +32,7 @@ import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;
import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
import org.hamcrest.Matchers;
import java.io.IOException;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
package org.elasticsearch.xpack.analytics.aggregations.metrics;
import com.tdunning.math.stats.Centroid;
import com.tdunning.math.stats.TDigest;
@ -34,6 +34,7 @@ import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;
import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
import java.io.IOException;
import java.util.Arrays;

View File

@ -19,10 +19,10 @@ setup:
- '{"index": {}}'
- '{"latency": {"values" : [0.1, 0.2, 0.3, 0.4, 0.5], "counts" : [3, 7, 23, 12, 6]}}'
- '{"index": {}}'
- '{"latency": {"values" : [0.1, 0.2, 0.3, 0.4, 0.5], "counts" : [2, 5, 10, 1, 8]}}'
- '{"latency": {"values" : [0, 0.1, 0.2, 0.3, 0.4, 0.5], "counts" : [3, 2, 5, 10, 1, 8]}}'
---
"Histogram Sum Aggregation":
"Histogram Aggregations":
- do:
search:
@ -33,8 +33,17 @@ setup:
histo_sum:
sum:
field: "latency"
histo_value_count:
value_count:
field: "latency"
histo_avg:
avg:
field: "latency"
- match: { hits.total.value: 2 }
- match: { aggregations.histo_sum.value: 25 }
- match: { aggregations.histo_value_count.value: 80 }
- match: { aggregations.histo_avg.value: 0.3125}