New Histogram field mapper that supports percentiles aggregations. (#48580) (#49683)

This commit adds  a new histogram field mapper that consists in a pre-aggregated format of numerical data to be used in percentiles aggregations.
This commit is contained in:
Ignacio Vera 2019-11-28 15:06:26 +01:00 committed by GitHub
parent 04e9cbd6eb
commit 326fe7566e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 2127 additions and 72 deletions

View File

@ -2,9 +2,9 @@
=== Percentiles Aggregation
A `multi-value` metrics aggregation that calculates one or more percentiles
over numeric values extracted from the aggregated documents. These values
can be extracted either from specific numeric fields in the documents, or
be generated by a provided script.
over numeric values extracted from the aggregated documents. These values can be
generated by a provided script or extracted from specific numeric or
<<histogram,histogram fields>> in the documents.
Percentiles show the point at which a certain percentage of observed values
occur. For example, the 95th percentile is the value which is greater than 95%

View File

@ -2,9 +2,9 @@
=== Percentile Ranks Aggregation
A `multi-value` metrics aggregation that calculates one or more percentile ranks
over numeric values extracted from the aggregated documents. These values
can be extracted either from specific numeric fields in the documents, or
be generated by a provided script.
over numeric values extracted from the aggregated documents. These values can be
generated by a provided script or extracted from specific numeric or
<<histogram,histogram fields>> in the documents.
[NOTE]
==================================================

View File

@ -32,6 +32,7 @@ string:: <<text,`text`>> and <<keyword,`keyword`>>
<<ip>>:: `ip` for IPv4 and IPv6 addresses
<<completion-suggester,Completion datatype>>::
`completion` to provide auto-complete suggestions
<<token-count>>:: `token_count` to count the number of tokens in a string
{plugins}/mapper-murmur3.html[`mapper-murmur3`]:: `murmur3` to compute hashes of values at index-time and store them in the index
{plugins}/mapper-annotated-text.html[`mapper-annotated-text`]:: `annotated-text` to index text containing special markup (typically used for identifying named entities)
@ -56,6 +57,8 @@ string:: <<text,`text`>> and <<keyword,`keyword`>>
<<shape>>:: `shape` for arbitrary cartesian geometries.
<<histogram>>:: `histogram` for pre-aggregated numerical values for percentiles aggregations.
[float]
[[types-array-handling]]
=== Arrays
@ -91,6 +94,8 @@ include::types/date_nanos.asciidoc[]
include::types/dense-vector.asciidoc[]
include::types/histogram.asciidoc[]
include::types/flattened.asciidoc[]
include::types/geo-point.asciidoc[]

View File

@ -0,0 +1,119 @@
[role="xpack"]
[testenv="basic"]
[[histogram]]
=== Histogram datatype
++++
<titleabbrev>Histogram</titleabbrev>
++++
A field to store pre-aggregated numerical data representing a histogram.
This data is defined using two paired arrays:
* A `values` array of <<number, `double`>> numbers, representing the buckets for
the histogram. These values must be provided in ascending order.
* A corresponding `counts` array of <<number, `integer`>> numbers, representing how
many values fall into each bucket. These numbers must be positive or zero.
Because the elements in the `values` array correspond to the elements in the
same position of the `count` array, these two arrays must have the same length.
[IMPORTANT]
========
* A `histogram` field can only store a single pair of `values` and `count` arrays
per document. Nested arrays are not supported.
* `histogram` fields do not support sorting.
========
[[histogram-uses]]
==== Uses
`histogram` fields are primarily intended for use with aggregations. To make it
more readily accessible for aggregations, `histogram` field data is stored as a
binary <<doc-values,doc values>> and not indexed. Its size in bytes is at most
`13 * numValues`, where `numValues` is the length of the provided arrays.
Because the data is not indexed, you only can use `histogram` fields for the
following aggregations and queries:
* <<search-aggregations-metrics-percentile-aggregation,percentiles>> aggregation
* <<search-aggregations-metrics-percentile-rank-aggregation,percentile ranks>> aggregation
* <<query-dsl-exists-query,exists>> query
[[mapping-types-histogram-building-histogram]]
==== Building a histogram
When using a histogram as part of an aggregation, the accuracy of the results will depend on how the
histogram was constructed. It is important to consider the percentiles aggregation mode that will be used
to build it. Some possibilities include:
- For the <<search-aggregations-metrics-percentile-aggregation, T-Digest>> mode, the `values` array represents
the mean centroid positions and the `counts` array represents the number of values that are attributed to each
centroid. If the algorithm has already started to approximate the percentiles, this inaccuracy is
carried over in the histogram.
- For the <<_hdr_histogram,High Dynamic Range (HDR)>> histogram mode, the `values` array represents fixed upper
limits of each bucket interval, and the `counts` array represents the number of values that are attributed to each
interval. This implementation maintains a fixed worse-case percentage error (specified as a number of significant digits),
therefore the value used when generating the histogram would be the maximum accuracy you can achieve at aggregation time.
The histogram field is "algorithm agnostic" and does not store data specific to either T-Digest or HDRHistogram. While this
means the field can technically be aggregated with either algorithm, in practice the user should chose one algorithm and
index data in that manner (e.g. centroids for T-Digest or intervals for HDRHistogram) to ensure best accuracy.
[[histogram-ex]]
==== Examples
The following <<indices-create-index, create index>> API request creates a new index with two field mappings:
* `my_histogram`, a `histogram` field used to store percentile data
* `my_text`, a `keyword` field used to store a title for the histogram
[ INSERT CREATE INDEX SNIPPET ]
[source,console]
--------------------------------------------------
PUT my_index
{
"mappings": {
"properties": {
"my_histogram": {
"type" : "histogram"
},
"my_text" : {
"type" : "keyword"
}
}
}
}
--------------------------------------------------
The following <<docs-index_,index>> API requests store pre-aggregated for
two histograms: `histogram_1` and `histogram_2`.
[source,console]
--------------------------------------------------
PUT my_index/_doc/1
{
"my_text" : "histogram_1",
"my_histogram" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5], <1>
"counts" : [3, 7, 23, 12, 6] <2>
}
}
PUT my_index/_doc/2
{
"my_text" : "histogram_2",
"my_histogram" : {
"values" : [0.1, 0.25, 0.35, 0.4, 0.45, 0.5], <1>
"counts" : [8, 17, 8, 7, 6, 2] <2>
}
}
--------------------------------------------------
<1> Values for each bucket. Values in the array are treated as doubles and must be given in
increasing order. For <<search-aggregations-metrics-percentile-aggregation-approximation, T-Digest>>
histograms this value represents the mean value. In case of HDR histograms this represents the value iterated to.
<2> Count for each bucket. Values in the arrays are treated as integers and must be positive or zero.
Negative values will be rejected. The relation between a bucket and a count is given by the position in the array.

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.fielddata;
import java.io.IOException;
/**
* {@link AtomicFieldData} specialization for histogram data.
*/
public interface AtomicHistogramFieldData extends AtomicFieldData {
/**
* Return Histogram values.
*/
HistogramValues getHistogramValues() throws IOException;
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.fielddata;
import java.io.IOException;
/**
* Per-document histogram value. Every value of the histogram consist on
* a value and a count.
*/
public abstract class HistogramValue {
/**
* Advance this instance to the next value of the histogram
* @return true if there is a next value
*/
public abstract boolean next() throws IOException;
/**
* the current value of the histogram
* @return the current value of the histogram
*/
public abstract double value();
/**
* The current count of the histogram
* @return the current count of the histogram
*/
public abstract int count();
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.fielddata;
import java.io.IOException;
/**
* Per-segment histogram values.
*/
public abstract class HistogramValues {
/**
* Advance this instance to the given document id
* @return true if there is a value for this document
*/
public abstract boolean advanceExact(int doc) throws IOException;
/**
* Get the {@link HistogramValue} associated with the current document.
* The returned {@link HistogramValue} might be reused across calls.
*/
public abstract HistogramValue histogram() throws IOException;
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.fielddata;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.plain.DocValuesIndexFieldData;
/**
* Specialization of {@link IndexFieldData} for histograms.
*/
public abstract class IndexHistogramFieldData extends DocValuesIndexFieldData implements IndexFieldData<AtomicHistogramFieldData> {
public IndexHistogramFieldData(Index index, String fieldName) {
super(index, fieldName);
}
}

View File

@ -26,12 +26,13 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
@ -47,13 +48,13 @@ abstract class AbstractHDRPercentilesAggregator extends NumericMetricsAggregator
}
protected final double[] keys;
protected final ValuesSource.Numeric valuesSource;
protected final ValuesSource valuesSource;
protected final DocValueFormat format;
protected ObjectArray<DoubleHistogram> states;
protected final int numberOfSignificantValueDigits;
protected final boolean keyed;
AbstractHDRPercentilesAggregator(String name, ValuesSource.Numeric valuesSource, SearchContext context, Aggregator parent,
AbstractHDRPercentilesAggregator(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
double[] keys, int numberOfSignificantValueDigits, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
@ -77,25 +78,22 @@ abstract class AbstractHDRPercentilesAggregator extends NumericMetricsAggregator
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx);
if (valuesSource instanceof ValuesSource.Histogram) {
final HistogramValues values = ((ValuesSource.Histogram)valuesSource).getHistogramValues(ctx);
return collectHistogramValues(values, bigArrays, sub);
} else {
final SortedNumericDoubleValues values = ((ValuesSource.Numeric)valuesSource).doubleValues(ctx);
return collectNumeric(values, bigArrays, sub);
}
}
private LeafBucketCollector collectNumeric(final SortedNumericDoubleValues values,
final BigArrays bigArrays, final LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
states = bigArrays.grow(states, bucket + 1);
DoubleHistogram state = states.get(bucket);
if (state == null) {
state = new DoubleHistogram(numberOfSignificantValueDigits);
// Set the histogram to autosize so it can resize itself as
// the data range increases. Resize operations should be
// rare as the histogram buckets are exponential (on the top
// level). In the future we could expose the range as an
// option on the request so the histogram can be fixed at
// initialisation and doesn't need resizing.
state.setAutoResize(true);
states.set(bucket, state);
}
DoubleHistogram state = getExistingOrNewHistogram(bigArrays, bucket);
if (values.advanceExact(doc)) {
final int valueCount = values.docValueCount();
for (int i = 0; i < valueCount; i++) {
@ -106,6 +104,39 @@ abstract class AbstractHDRPercentilesAggregator extends NumericMetricsAggregator
};
}
private LeafBucketCollector collectHistogramValues(final HistogramValues values,
final BigArrays bigArrays, final LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
DoubleHistogram state = getExistingOrNewHistogram(bigArrays, bucket);
if (values.advanceExact(doc)) {
final HistogramValue sketch = values.histogram();
while (sketch.next()) {
state.recordValueWithCount(sketch.value(), sketch.count());
}
}
}
};
}
private DoubleHistogram getExistingOrNewHistogram(final BigArrays bigArrays, long bucket) {
states = bigArrays.grow(states, bucket + 1);
DoubleHistogram state = states.get(bucket);
if (state == null) {
state = new DoubleHistogram(numberOfSignificantValueDigits);
// Set the histogram to autosize so it can resize itself as
// the data range increases. Resize operations should be
// rare as the histogram buckets are exponential (on the top
// level). In the future we could expose the range as an
// option on the request so the histogram can be fixed at
// initialisation and doesn't need resizing.
state.setAutoResize(true);
states.set(bucket, state);
}
return state;
}
@Override
public boolean hasMetric(String name) {
return indexOfKey(keys, Double.parseDouble(name)) >= 0;

View File

@ -25,6 +25,8 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
@ -45,13 +47,13 @@ abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggreg
}
protected final double[] keys;
protected final ValuesSource.Numeric valuesSource;
protected final ValuesSource valuesSource;
protected final DocValueFormat formatter;
protected ObjectArray<TDigestState> states;
protected final double compression;
protected final boolean keyed;
AbstractTDigestPercentilesAggregator(String name, ValuesSource.Numeric valuesSource, SearchContext context, Aggregator parent,
AbstractTDigestPercentilesAggregator(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
double[] keys, double compression, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
@ -75,18 +77,22 @@ abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggreg
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx);
if (valuesSource instanceof ValuesSource.Histogram) {
final HistogramValues values = ((ValuesSource.Histogram)valuesSource).getHistogramValues(ctx);
return collectHistogramValues(values, bigArrays, sub);
} else {
final SortedNumericDoubleValues values = ((ValuesSource.Numeric)valuesSource).doubleValues(ctx);
return collectNumeric(values, bigArrays, sub);
}
}
private LeafBucketCollector collectNumeric(final SortedNumericDoubleValues values,
final BigArrays bigArrays, final LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
states = bigArrays.grow(states, bucket + 1);
TDigestState state = states.get(bucket);
if (state == null) {
state = new TDigestState(compression);
states.set(bucket, state);
}
TDigestState state = getExistingOrNewHistogram(bigArrays, bucket);
if (values.advanceExact(doc)) {
final int valueCount = values.docValueCount();
for (int i = 0; i < valueCount; i++) {
@ -97,6 +103,32 @@ abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggreg
};
}
private LeafBucketCollector collectHistogramValues(final HistogramValues values,
final BigArrays bigArrays, final LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
TDigestState state = getExistingOrNewHistogram(bigArrays, bucket);
if (values.advanceExact(doc)) {
final HistogramValue sketch = values.histogram();
while(sketch.next()) {
state.add(sketch.value(), sketch.count());
}
}
}
};
}
private TDigestState getExistingOrNewHistogram(final BigArrays bigArrays, long bucket) {
states = bigArrays.grow(states, bucket + 1);
TDigestState state = states.get(bucket);
if (state == null) {
state = new TDigestState(compression);
states.set(bucket, state);
}
return state;
}
@Override
public boolean hasMetric(String name) {
return indexOfKey(keys, Double.parseDouble(name)) >= 0;

View File

@ -23,7 +23,7 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
@ -32,7 +32,7 @@ import java.util.Map;
class HDRPercentileRanksAggregator extends AbstractHDRPercentilesAggregator {
HDRPercentileRanksAggregator(String name, Numeric valuesSource, SearchContext context, Aggregator parent,
HDRPercentileRanksAggregator(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
double[] percents, int numberOfSignificantValueDigits, boolean keyed, DocValueFormat format,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, valuesSource, context, parent, percents, numberOfSignificantValueDigits, keyed, format, pipelineAggregators,

View File

@ -25,7 +25,6 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
@ -35,13 +34,13 @@ import java.util.List;
import java.util.Map;
class HDRPercentileRanksAggregatorFactory
extends ValuesSourceAggregatorFactory<ValuesSource.Numeric> {
extends ValuesSourceAggregatorFactory<ValuesSource> {
private final double[] values;
private final int numberOfSignificantValueDigits;
private final boolean keyed;
HDRPercentileRanksAggregatorFactory(String name, ValuesSourceConfig<Numeric> config, double[] values,
HDRPercentileRanksAggregatorFactory(String name, ValuesSourceConfig<ValuesSource> config, double[] values,
int numberOfSignificantValueDigits, boolean keyed, QueryShardContext queryShardContext,
AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
@ -61,7 +60,7 @@ class HDRPercentileRanksAggregatorFactory
}
@Override
protected Aggregator doCreateInternal(Numeric valuesSource,
protected Aggregator doCreateInternal(ValuesSource valuesSource,
SearchContext searchContext,
Aggregator parent,
boolean collectsFromSingleBucket,

View File

@ -23,7 +23,7 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
@ -32,7 +32,7 @@ import java.util.Map;
class HDRPercentilesAggregator extends AbstractHDRPercentilesAggregator {
HDRPercentilesAggregator(String name, Numeric valuesSource, SearchContext context, Aggregator parent, double[] percents,
HDRPercentilesAggregator(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent, double[] percents,
int numberOfSignificantValueDigits, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, valuesSource, context, parent, percents, numberOfSignificantValueDigits, keyed, formatter,

View File

@ -25,7 +25,6 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
@ -34,14 +33,14 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
class HDRPercentilesAggregatorFactory extends ValuesSourceAggregatorFactory<ValuesSource.Numeric> {
class HDRPercentilesAggregatorFactory extends ValuesSourceAggregatorFactory<ValuesSource> {
private final double[] percents;
private final int numberOfSignificantValueDigits;
private final boolean keyed;
HDRPercentilesAggregatorFactory(String name,
ValuesSourceConfig<Numeric> config,
ValuesSourceConfig<ValuesSource> config,
double[] percents,
int numberOfSignificantValueDigits,
boolean keyed,
@ -66,7 +65,7 @@ class HDRPercentilesAggregatorFactory extends ValuesSourceAggregatorFactory<Valu
}
@Override
protected Aggregator doCreateInternal(Numeric valuesSource,
protected Aggregator doCreateInternal(ValuesSource valuesSource,
SearchContext searchContext,
Aggregator parent,
boolean collectsFromSingleBucket,

View File

@ -33,7 +33,6 @@ import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder.LeafOnly;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
@ -47,7 +46,7 @@ import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
public class PercentileRanksAggregationBuilder extends LeafOnly<ValuesSource.Numeric, PercentileRanksAggregationBuilder> {
public class PercentileRanksAggregationBuilder extends LeafOnly<ValuesSource, PercentileRanksAggregationBuilder> {
public static final String NAME = PercentileRanks.TYPE_NAME;
public static final ParseField VALUES_FIELD = new ParseField("values");
@ -80,7 +79,7 @@ public class PercentileRanksAggregationBuilder extends LeafOnly<ValuesSource.Num
static {
PARSER = new ConstructingObjectParser<>(PercentileRanksAggregationBuilder.NAME, false,
(a, context) -> new PercentileRanksAggregationBuilder(context, (List) a[0]));
ValuesSourceParserHelper.declareNumericFields(PARSER, true, false, false);
ValuesSourceParserHelper.declareAnyFields(PARSER, true, true);
PARSER.declareDoubleArray(constructorArg(), VALUES_FIELD);
PARSER.declareBoolean(PercentileRanksAggregationBuilder::keyed, PercentilesAggregationBuilder.KEYED_FIELD);
@ -240,8 +239,10 @@ public class PercentileRanksAggregationBuilder extends LeafOnly<ValuesSource.Num
}
@Override
protected ValuesSourceAggregatorFactory<Numeric> innerBuild(QueryShardContext queryShardContext, ValuesSourceConfig<Numeric> config,
AggregatorFactory parent, Builder subFactoriesBuilder) throws IOException {
protected ValuesSourceAggregatorFactory<ValuesSource> innerBuild(QueryShardContext queryShardContext,
ValuesSourceConfig<ValuesSource> config,
AggregatorFactory parent,
Builder subFactoriesBuilder) throws IOException {
switch (method) {
case TDIGEST:
return new TDigestPercentileRanksAggregatorFactory(name, config, values, compression, keyed, queryShardContext, parent,

View File

@ -32,7 +32,6 @@ import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder.LeafOnly;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
@ -44,7 +43,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
public class PercentilesAggregationBuilder extends LeafOnly<ValuesSource.Numeric, PercentilesAggregationBuilder> {
public class PercentilesAggregationBuilder extends LeafOnly<ValuesSource, PercentilesAggregationBuilder> {
public static final String NAME = Percentiles.TYPE_NAME;
private static final double[] DEFAULT_PERCENTS = new double[] { 1, 5, 25, 50, 75, 95, 99 };
@ -79,7 +78,7 @@ public class PercentilesAggregationBuilder extends LeafOnly<ValuesSource.Numeric
private static final ObjectParser<InternalBuilder, Void> PARSER;
static {
PARSER = new ObjectParser<>(PercentilesAggregationBuilder.NAME);
ValuesSourceParserHelper.declareNumericFields(PARSER, true, true, false);
ValuesSourceParserHelper.declareAnyFields(PARSER, true, true);
PARSER.declareDoubleArray(
(b, v) -> b.percentiles(v.stream().mapToDouble(Double::doubleValue).toArray()),
@ -263,8 +262,8 @@ public class PercentilesAggregationBuilder extends LeafOnly<ValuesSource.Numeric
}
@Override
protected ValuesSourceAggregatorFactory<Numeric> innerBuild(QueryShardContext queryShardContext,
ValuesSourceConfig<Numeric> config,
protected ValuesSourceAggregatorFactory<ValuesSource> innerBuild(QueryShardContext queryShardContext,
ValuesSourceConfig<ValuesSource> config,
AggregatorFactory parent,
Builder subFactoriesBuilder) throws IOException {
switch (method) {

View File

@ -22,7 +22,7 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
@ -32,7 +32,7 @@ import java.util.Map;
class TDigestPercentileRanksAggregator extends AbstractTDigestPercentilesAggregator {
TDigestPercentileRanksAggregator(String name,
Numeric valuesSource,
ValuesSource valuesSource,
SearchContext context,
Aggregator parent,
double[] percents,

View File

@ -25,7 +25,6 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
@ -35,14 +34,14 @@ import java.util.List;
import java.util.Map;
class TDigestPercentileRanksAggregatorFactory
extends ValuesSourceAggregatorFactory<ValuesSource.Numeric> {
extends ValuesSourceAggregatorFactory<ValuesSource> {
private final double[] percents;
private final double compression;
private final boolean keyed;
TDigestPercentileRanksAggregatorFactory(String name,
ValuesSourceConfig<Numeric> config,
ValuesSourceConfig<ValuesSource> config,
double[] percents,
double compression,
boolean keyed,
@ -66,7 +65,7 @@ class TDigestPercentileRanksAggregatorFactory
}
@Override
protected Aggregator doCreateInternal(Numeric valuesSource,
protected Aggregator doCreateInternal(ValuesSource valuesSource,
SearchContext searchContext,
Aggregator parent,
boolean collectsFromSingleBucket,

View File

@ -22,7 +22,7 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
@ -32,7 +32,7 @@ import java.util.Map;
class TDigestPercentilesAggregator extends AbstractTDigestPercentilesAggregator {
TDigestPercentilesAggregator(String name,
Numeric valuesSource,
ValuesSource valuesSource,
SearchContext context,
Aggregator parent,
double[] percents,

View File

@ -25,7 +25,6 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
@ -35,13 +34,13 @@ import java.util.List;
import java.util.Map;
class TDigestPercentilesAggregatorFactory
extends ValuesSourceAggregatorFactory<ValuesSource.Numeric> {
extends ValuesSourceAggregatorFactory<ValuesSource> {
private final double[] percents;
private final double compression;
private final boolean keyed;
TDigestPercentilesAggregatorFactory(String name, ValuesSourceConfig<Numeric> config, double[] percents,
TDigestPercentilesAggregatorFactory(String name, ValuesSourceConfig<ValuesSource> config, double[] percents,
double compression, boolean keyed, QueryShardContext queryShardContext, AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
super(name, config, queryShardContext, parent, subFactoriesBuilder, metaData);
@ -60,7 +59,7 @@ class TDigestPercentilesAggregatorFactory
}
@Override
protected Aggregator doCreateInternal(Numeric valuesSource,
protected Aggregator doCreateInternal(ValuesSource valuesSource,
SearchContext searchContext,
Aggregator parent,
boolean collectsFromSingleBucket,

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexGeoPointFieldData;
import org.elasticsearch.index.fielddata.IndexHistogramFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.IndexOrdinalsFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
@ -191,6 +192,34 @@ public enum CoreValuesSourceType implements Writeable, ValuesSourceType {
return new ValuesSource.Range(fieldContext.indexFieldData(), rangeFieldType.rangeType());
}
@Override
public ValuesSource replaceMissing(ValuesSource valuesSource, Object rawMissing, DocValueFormat docValueFormat, LongSupplier now) {
throw new IllegalArgumentException("Can't apply missing values on a " + valuesSource.getClass());
}
},
HISTOGRAM {
@Override
public ValuesSource getEmpty() {
// TODO: Is this the correct exception type here?
throw new IllegalArgumentException("Can't deal with unmapped ValuesSource type " + this.value());
}
@Override
public ValuesSource getScript(AggregationScript.LeafFactory script, ValueType scriptValueType) {
throw new AggregationExecutionException("value source of type [" + this.value() + "] is not supported by scripts");
}
@Override
public ValuesSource getField(FieldContext fieldContext, AggregationScript.LeafFactory script) {
final IndexFieldData<?> indexFieldData = fieldContext.indexFieldData();
if (!(indexFieldData instanceof IndexHistogramFieldData)) {
throw new IllegalArgumentException("Expected histogram type on field [" + fieldContext.field() +
"], but got [" + fieldContext.fieldType().typeName() + "]");
}
return new ValuesSource.Histogram.Fielddata((IndexHistogramFieldData) indexFieldData);
}
@Override
public ValuesSource replaceMissing(ValuesSource valuesSource, Object rawMissing, DocValueFormat docValueFormat, LongSupplier now) {
throw new IllegalArgumentException("Can't apply missing values on a " + valuesSource.getClass());

View File

@ -33,8 +33,10 @@ import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.fielddata.AbstractSortingNumericDocValues;
import org.elasticsearch.index.fielddata.AtomicOrdinalsFieldData;
import org.elasticsearch.index.fielddata.DocValueBits;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexGeoPointFieldData;
import org.elasticsearch.index.fielddata.IndexHistogramFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.IndexOrdinalsFieldData;
import org.elasticsearch.index.fielddata.MultiGeoPointValues;
@ -563,5 +565,39 @@ public abstract class ValuesSource {
}
}
}
public abstract static class Histogram extends ValuesSource {
public abstract HistogramValues getHistogramValues(LeafReaderContext context) throws IOException;
public static class Fielddata extends Histogram {
protected final IndexHistogramFieldData indexFieldData;
public Fielddata(IndexHistogramFieldData indexFieldData) {
this.indexFieldData = indexFieldData;
}
@Override
public SortedBinaryDocValues bytesValues(LeafReaderContext context) {
return indexFieldData.load(context).getBytesValues();
}
@Override
public DocValueBits docsWithValue(LeafReaderContext context) throws IOException {
HistogramValues values = getHistogramValues(context);
return new DocValueBits() {
@Override
public boolean advanceExact(int doc) throws IOException {
return values.advanceExact(doc);
}
};
}
public HistogramValues getHistogramValues(LeafReaderContext context) throws IOException {
return indexFieldData.load(context).getHistogramValues();
}
}
}
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexGeoPointFieldData;
import org.elasticsearch.index.fielddata.IndexHistogramFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
@ -114,6 +115,8 @@ public class ValuesSourceConfig<VS extends ValuesSource> {
config = new ValuesSourceConfig<>(CoreValuesSourceType.GEOPOINT);
} else if (fieldType instanceof RangeFieldMapper.RangeFieldType) {
config = new ValuesSourceConfig<>(CoreValuesSourceType.RANGE);
} else if (indexFieldData instanceof IndexHistogramFieldData) {
config = new ValuesSourceConfig<>(CoreValuesSourceType.HISTOGRAM);
} else {
if (valueType == null) {
config = new ValuesSourceConfig<>(CoreValuesSourceType.BYTES);
@ -250,7 +253,7 @@ public class ValuesSourceConfig<VS extends ValuesSource> {
public VS toValuesSource(QueryShardContext context, Function<Object, ValuesSource> resolveMissingAny) {
if (!valid()) {
throw new IllegalStateException(
"value source config is invalid; must have either a field context or a script or marked as unwrapped");
"value source config is invalid; must have either a field context or a script or marked as unwrapped");
}
final VS vs;

View File

@ -9,13 +9,16 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregator;
import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
import org.elasticsearch.xpack.analytics.stringstats.InternalStringStats;
import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;
import org.elasticsearch.xpack.core.XPackPlugin;
@ -23,12 +26,14 @@ import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.singletonList;
public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugin {
public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugin, MapperPlugin {
// TODO this should probably become more structured once Analytics plugin has more than just one agg
public static AtomicLong cumulativeCardUsage = new AtomicLong(0);
@ -78,4 +83,9 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
modules.add(b -> XPackPlugin.bindFeatureSet(b, AnalyticsFeatureSet.class));
return modules;
}
@Override
public Map<String, Mapper.TypeParser> getMappers() {
return Collections.singletonMap(HistogramFieldMapper.CONTENT_TYPE, new HistogramFieldMapper.TypeParser());
}
}

View File

@ -0,0 +1,442 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
import com.carrotsearch.hppc.DoubleArrayList;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SortField;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentSubParser;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.fielddata.AtomicHistogramFieldData;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexHistogramFieldData;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
/**
* Field Mapper for pre-aggregated histograms.
*/
public class HistogramFieldMapper extends FieldMapper {
public static final String CONTENT_TYPE = "histogram";
public static class Names {
public static final String IGNORE_MALFORMED = "ignore_malformed";
}
public static class Defaults {
public static final Explicit<Boolean> IGNORE_MALFORMED = new Explicit<>(false, false);
public static final HistogramFieldType FIELD_TYPE = new HistogramFieldType();
static {
FIELD_TYPE.setTokenized(false);
FIELD_TYPE.setHasDocValues(true);
FIELD_TYPE.setIndexOptions(IndexOptions.NONE);
FIELD_TYPE.freeze();
}
}
public static final ParseField COUNTS_FIELD = new ParseField("counts");
public static final ParseField VALUES_FIELD = new ParseField("values");
public static class Builder extends FieldMapper.Builder<Builder, HistogramFieldMapper> {
protected Boolean ignoreMalformed;
public Builder(String name) {
super(name, Defaults.FIELD_TYPE, Defaults.FIELD_TYPE);
builder = this;
}
public Builder ignoreMalformed(boolean ignoreMalformed) {
this.ignoreMalformed = ignoreMalformed;
return builder;
}
protected Explicit<Boolean> ignoreMalformed(BuilderContext context) {
if (ignoreMalformed != null) {
return new Explicit<>(ignoreMalformed, true);
}
if (context.indexSettings() != null) {
return new Explicit<>(IGNORE_MALFORMED_SETTING.get(context.indexSettings()), false);
}
return HistogramFieldMapper.Defaults.IGNORE_MALFORMED;
}
public HistogramFieldMapper build(BuilderContext context, String simpleName, MappedFieldType fieldType,
MappedFieldType defaultFieldType, Settings indexSettings,
MultiFields multiFields, Explicit<Boolean> ignoreMalformed, CopyTo copyTo) {
setupFieldType(context);
return new HistogramFieldMapper(simpleName, fieldType, defaultFieldType, indexSettings, multiFields,
ignoreMalformed, copyTo);
}
@Override
public HistogramFieldMapper build(BuilderContext context) {
return build(context, name, fieldType, defaultFieldType, context.indexSettings(),
multiFieldsBuilder.build(this, context), ignoreMalformed(context), copyTo);
}
}
public static class TypeParser implements Mapper.TypeParser {
@Override
public Mapper.Builder<Builder, HistogramFieldMapper> parse(String name,
Map<String, Object> node, ParserContext parserContext)
throws MapperParsingException {
Builder builder = new HistogramFieldMapper.Builder(name);
for (Iterator<Map.Entry<String, Object>> iterator = node.entrySet().iterator(); iterator.hasNext();) {
Map.Entry<String, Object> entry = iterator.next();
String propName = entry.getKey();
Object propNode = entry.getValue();
if (propName.equals(Names.IGNORE_MALFORMED)) {
builder.ignoreMalformed(XContentMapValues.nodeBooleanValue(propNode, name + "." + Names.IGNORE_MALFORMED));
iterator.remove();
}
}
return builder;
}
}
protected Explicit<Boolean> ignoreMalformed;
public HistogramFieldMapper(String simpleName, MappedFieldType fieldType, MappedFieldType defaultFieldType,
Settings indexSettings, MultiFields multiFields, Explicit<Boolean> ignoreMalformed, CopyTo copyTo) {
super(simpleName, fieldType, defaultFieldType, indexSettings, multiFields, copyTo);
this.ignoreMalformed = ignoreMalformed;
}
@Override
protected void doMerge(Mapper mergeWith) {
super.doMerge(mergeWith);
HistogramFieldMapper gpfmMergeWith = (HistogramFieldMapper) mergeWith;
if (gpfmMergeWith.ignoreMalformed.explicit()) {
this.ignoreMalformed = gpfmMergeWith.ignoreMalformed;
}
}
@Override
protected String contentType() {
return CONTENT_TYPE;
}
@Override
protected void parseCreateField(ParseContext context, List<IndexableField> fields) throws IOException {
throw new UnsupportedOperationException("Parsing is implemented in parse(), this method should NEVER be called");
}
public static class HistogramFieldType extends MappedFieldType {
public HistogramFieldType() {
}
HistogramFieldType(HistogramFieldType ref) {
super(ref);
}
@Override
public String typeName() {
return CONTENT_TYPE;
}
@Override
public MappedFieldType clone() {
return new HistogramFieldType(this);
}
@Override
public IndexFieldData.Builder fielddataBuilder(String fullyQualifiedIndexName) {
failIfNoDocValues();
return new IndexFieldData.Builder() {
@Override
public IndexFieldData<?> build(IndexSettings indexSettings, MappedFieldType fieldType, IndexFieldDataCache cache,
CircuitBreakerService breakerService, MapperService mapperService) {
return new IndexHistogramFieldData(indexSettings.getIndex(), fieldType.name()) {
@Override
public AtomicHistogramFieldData load(LeafReaderContext context) {
return new AtomicHistogramFieldData() {
@Override
public HistogramValues getHistogramValues() throws IOException {
try {
final BinaryDocValues values = DocValues.getBinary(context.reader(), fieldName);
return new HistogramValues() {
@Override
public boolean advanceExact(int doc) throws IOException {
return values.advanceExact(doc);
}
@Override
public HistogramValue histogram() throws IOException {
try {
return getHistogramValue(values.binaryValue());
} catch (IOException e) {
throw new IOException("Cannot load doc value", e);
}
}
};
} catch (IOException e) {
throw new IOException("Cannot load doc values", e);
}
}
@Override
public ScriptDocValues<?> getScriptValues() {
throw new UnsupportedOperationException("The [" + CONTENT_TYPE + "] field does not " +
"support scripts");
}
@Override
public SortedBinaryDocValues getBytesValues() {
throw new UnsupportedOperationException("String representation of doc values " +
"for [" + CONTENT_TYPE + "] fields is not supported");
}
@Override
public long ramBytesUsed() {
return 0; // Unknown
}
@Override
public void close() {
}
};
}
@Override
public AtomicHistogramFieldData loadDirect(LeafReaderContext context) throws Exception {
return load(context);
}
@Override
public SortField sortField(Object missingValue, MultiValueMode sortMode,
XFieldComparatorSource.Nested nested, boolean reverse) {
throw new UnsupportedOperationException("can't sort on the [" + CONTENT_TYPE + "] field");
}
};
}
private HistogramValue getHistogramValue(final BytesRef bytesRef) throws IOException {
final ByteBufferStreamInput streamInput = new ByteBufferStreamInput(
ByteBuffer.wrap(bytesRef.bytes, bytesRef.offset, bytesRef.length));
return new HistogramValue() {
double value;
int count;
boolean isExhausted;
@Override
public boolean next() throws IOException {
if (streamInput.available() > 0) {
count = streamInput.readVInt();
value = streamInput.readDouble();
return true;
}
isExhausted = true;
return false;
}
@Override
public double value() {
if (isExhausted) {
throw new IllegalArgumentException("histogram already exhausted");
}
return value;
}
@Override
public int count() {
if (isExhausted) {
throw new IllegalArgumentException("histogram already exhausted");
}
return count;
}
};
}
};
}
@Override
public Query existsQuery(QueryShardContext context) {
if (hasDocValues()) {
return new DocValuesFieldExistsQuery(name());
} else {
throw new QueryShardException(context, "field " + name() + " of type [" + CONTENT_TYPE + "] " +
"has no doc values and cannot be searched");
}
}
@Override
public Query termQuery(Object value, QueryShardContext context) {
throw new QueryShardException(context, "[" + CONTENT_TYPE + "] field do not support searching, " +
"use dedicated aggregations instead: ["
+ name() + "]");
}
}
@Override
public void parse(ParseContext context) throws IOException {
if (context.externalValueSet()) {
throw new IllegalArgumentException("Field [" + name() + "] of type [" + typeName() + "] can't be used in multi-fields");
}
context.path().add(simpleName());
XContentParser.Token token = null;
XContentSubParser subParser = null;
try {
token = context.parser().currentToken();
if (token == XContentParser.Token.VALUE_NULL) {
context.path().remove();
return;
}
DoubleArrayList values = null;
IntArrayList counts = null;
// should be an object
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, context.parser()::getTokenLocation);
subParser = new XContentSubParser(context.parser());
token = subParser.nextToken();
while (token != XContentParser.Token.END_OBJECT) {
// should be an field
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, subParser::getTokenLocation);
String fieldName = subParser.currentName();
if (fieldName.equals(VALUES_FIELD.getPreferredName())) {
token = subParser.nextToken();
// should be an array
ensureExpectedToken(XContentParser.Token.START_ARRAY, token, subParser::getTokenLocation);
values = new DoubleArrayList();
token = subParser.nextToken();
double previousVal = -Double.MAX_VALUE;
while (token != XContentParser.Token.END_ARRAY) {
// should be a number
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser::getTokenLocation);
double val = subParser.doubleValue();
if (val < previousVal) {
// values must be in increasing order
throw new MapperParsingException("error parsing field ["
+ name() + "], ["+ COUNTS_FIELD + "] values must be in increasing order, got [" + val +
"] but previous value was [" + previousVal +"]");
}
values.add(val);
previousVal = val;
token = subParser.nextToken();
}
} else if (fieldName.equals(COUNTS_FIELD.getPreferredName())) {
token = subParser.nextToken();
// should be an array
ensureExpectedToken(XContentParser.Token.START_ARRAY, token, subParser::getTokenLocation);
counts = new IntArrayList();
token = subParser.nextToken();
while (token != XContentParser.Token.END_ARRAY) {
// should be a number
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser::getTokenLocation);
counts.add(subParser.intValue());
token = subParser.nextToken();
}
} else {
throw new MapperParsingException("error parsing field [" +
name() + "], with unknown parameter [" + fieldName + "]");
}
token = subParser.nextToken();
}
if (values == null) {
throw new MapperParsingException("error parsing field ["
+ name() + "], expected field called [" + VALUES_FIELD.getPreferredName() + "]");
}
if (counts == null) {
throw new MapperParsingException("error parsing field ["
+ name() + "], expected field called [" + COUNTS_FIELD.getPreferredName() + "]");
}
if (values.size() != counts.size()) {
throw new MapperParsingException("error parsing field ["
+ name() + "], expected same length from [" + VALUES_FIELD.getPreferredName() +"] and " +
"[" + COUNTS_FIELD.getPreferredName() +"] but got [" + values.size() + " != " + counts.size() +"]");
}
if (fieldType().hasDocValues()) {
BytesStreamOutput streamOutput = new BytesStreamOutput();
for (int i = 0; i < values.size(); i++) {
int count = counts.get(i);
if (count < 0) {
throw new MapperParsingException("error parsing field ["
+ name() + "], ["+ COUNTS_FIELD + "] elements must be >= 0 but got " + counts.get(i));
} else if (count > 0) {
// we do not add elements with count == 0
streamOutput.writeVInt(count);
streamOutput.writeDouble(values.get(i));
}
}
Field field = new BinaryDocValuesField(simpleName(), streamOutput.bytes().toBytesRef());
streamOutput.close();
if (context.doc().getByKey(fieldType().name()) != null) {
throw new IllegalArgumentException("Field [" + name() + "] of type [" + typeName() +
"] doesn't not support indexing multiple values for the same field in the same document");
}
context.doc().addWithKey(fieldType().name(), field);
}
} catch (Exception ex) {
if (ignoreMalformed.value() == false) {
throw new MapperParsingException("failed to parse field [{}] of type [{}]",
ex, fieldType().name(), fieldType().typeName());
}
if (subParser != null) {
// close the subParser so we advance to the end of the object
subParser.close();
}
context.addIgnoredField(fieldType().name());
}
context.path().remove();
}
@Override
protected void doXContentBody(XContentBuilder builder, boolean includeDefaults, Params params) throws IOException {
super.doXContentBody(builder, includeDefaults, params);
if (includeDefaults || ignoreMalformed.explicit()) {
builder.field(Names.IGNORE_MALFORMED, ignoreMalformed.value());
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
import org.HdrHistogram.DoubleHistogram;
import org.HdrHistogram.DoubleHistogramIterationValue;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.Percentile;
import org.elasticsearch.search.aggregations.metrics.PercentileRanks;
import org.elasticsearch.search.aggregations.metrics.PercentileRanksAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesMethod;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.Iterator;
public class HDRPreAggregatedPercentileRanksAggregatorTests extends AggregatorTestCase {
private BinaryDocValuesField getDocValue(String fieldName, double[] values) throws IOException {
DoubleHistogram histogram = new DoubleHistogram(3);//default
for (double value : values) {
histogram.recordValue(value);
}
BytesStreamOutput streamOutput = new BytesStreamOutput();
DoubleHistogram.RecordedValues recordedValues = histogram.recordedValues();
Iterator<DoubleHistogramIterationValue> iterator = recordedValues.iterator();
while (iterator.hasNext()) {
DoubleHistogramIterationValue value = iterator.next();
long count = value.getCountAtValueIteratedTo();
streamOutput.writeVInt(Math.toIntExact(count));
double d = value.getValueIteratedTo();
streamOutput.writeDouble(d);
}
return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef());
}
public void testSimple() throws IOException {
try (Directory dir = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {
Document doc = new Document();
doc.add(getDocValue("field", new double[] {3, 0.2, 10}));
w.addDocument(doc);
PercentileRanksAggregationBuilder aggBuilder = new PercentileRanksAggregationBuilder("my_agg", new double[]{0.1, 0.5, 12})
.field("field")
.method(PercentilesMethod.HDR);
MappedFieldType fieldType = new HistogramFieldMapper.Builder("field").fieldType();
fieldType.setName("field");
try (IndexReader reader = w.getReader()) {
IndexSearcher searcher = new IndexSearcher(reader);
PercentileRanks ranks = search(searcher, new MatchAllDocsQuery(), aggBuilder, fieldType);
Iterator<Percentile> rankIterator = ranks.iterator();
Percentile rank = rankIterator.next();
assertEquals(0.1, rank.getValue(), 0d);
assertThat(rank.getPercent(), Matchers.equalTo(0d));
rank = rankIterator.next();
assertEquals(0.5, rank.getValue(), 0d);
assertThat(rank.getPercent(), Matchers.greaterThan(0d));
assertThat(rank.getPercent(), Matchers.lessThan(100d));
rank = rankIterator.next();
assertEquals(12, rank.getValue(), 0d);
assertThat(rank.getPercent(), Matchers.equalTo(100d));
assertFalse(rankIterator.hasNext());
assertTrue(AggregationInspectionHelper.hasValue((InternalHDRPercentileRanks)ranks));
}
}
}
}

View File

@ -0,0 +1,131 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
import org.HdrHistogram.DoubleHistogram;
import org.HdrHistogram.DoubleHistogramIterationValue;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentiles;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesMethod;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import java.io.IOException;
import java.util.Iterator;
import java.util.function.Consumer;
import static java.util.Collections.singleton;
public class HDRPreAggregatedPercentilesAggregatorTests extends AggregatorTestCase {
private BinaryDocValuesField getDocValue(String fieldName, double[] values) throws IOException {
DoubleHistogram histogram = new DoubleHistogram(3);//default
for (double value : values) {
histogram.recordValue(value);
}
BytesStreamOutput streamOutput = new BytesStreamOutput();
DoubleHistogram.RecordedValues recordedValues = histogram.recordedValues();
Iterator<DoubleHistogramIterationValue> iterator = recordedValues.iterator();
while (iterator.hasNext()) {
DoubleHistogramIterationValue value = iterator.next();
long count = value.getCountAtValueIteratedTo();
if (count != 0) {
streamOutput.writeVInt(Math.toIntExact(count));
double d = value.getValueIteratedTo();
streamOutput.writeDouble(d);
}
}
return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef());
}
public void testNoMatchingField() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
iw.addDocument(singleton(getDocValue("wrong_number", new double[]{7, 1})));
}, hdr -> {
//assertEquals(0L, hdr.state.getTotalCount());
assertFalse(AggregationInspectionHelper.hasValue(hdr));
});
}
public void testEmptyField() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
iw.addDocument(singleton(getDocValue("number", new double[0])));
}, hdr -> {
assertFalse(AggregationInspectionHelper.hasValue(hdr));
});
}
public void testSomeMatchesBinaryDocValues() throws IOException {
testCase(new DocValuesFieldExistsQuery("number"), iw -> {
iw.addDocument(singleton(getDocValue("number", new double[]{60, 40, 20, 10})));
}, hdr -> {
//assertEquals(4L, hdr.state.getTotalCount());
double approximation = 0.05d;
assertEquals(10.0d, hdr.percentile(25), approximation);
assertEquals(20.0d, hdr.percentile(50), approximation);
assertEquals(40.0d, hdr.percentile(75), approximation);
assertEquals(60.0d, hdr.percentile(99), approximation);
assertTrue(AggregationInspectionHelper.hasValue(hdr));
});
}
public void testSomeMatchesMultiBinaryDocValues() throws IOException {
testCase(new DocValuesFieldExistsQuery("number"), iw -> {
iw.addDocument(singleton(getDocValue("number", new double[]{60, 40, 20, 10})));
iw.addDocument(singleton(getDocValue("number", new double[]{60, 40, 20, 10})));
iw.addDocument(singleton(getDocValue("number", new double[]{60, 40, 20, 10})));
iw.addDocument(singleton(getDocValue("number", new double[]{60, 40, 20, 10})));
}, hdr -> {
//assertEquals(16L, hdr.state.getTotalCount());
double approximation = 0.05d;
assertEquals(10.0d, hdr.percentile(25), approximation);
assertEquals(20.0d, hdr.percentile(50), approximation);
assertEquals(40.0d, hdr.percentile(75), approximation);
assertEquals(60.0d, hdr.percentile(99), approximation);
assertTrue(AggregationInspectionHelper.hasValue(hdr));
});
}
private void testCase(Query query, CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<InternalHDRPercentiles> verify) throws IOException {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
buildIndex.accept(indexWriter);
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
PercentilesAggregationBuilder builder =
new PercentilesAggregationBuilder("test").field("number").method(PercentilesMethod.HDR);
MappedFieldType fieldType = new HistogramFieldMapper.Builder("number").fieldType();
fieldType.setName("number");
Aggregator aggregator = createAggregator(builder, indexSearcher, fieldType);
aggregator.preCollection();
indexSearcher.search(query, aggregator);
aggregator.postCollection();
verify.accept((InternalHDRPercentiles) aggregator.buildAggregation(0L));
}
}
}
}

View File

@ -0,0 +1,509 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.elasticsearch.xpack.core.XPackPlugin;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class HistogramFieldMapperTests extends ESSingleNodeTestCase {
public void testParseValue() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
ParsedDocument doc = defaultMapper.parse(new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("values", new double[] {2, 3})
.field("counts", new int[] {0, 4})
.endObject()
.endObject()),
XContentType.JSON));
assertThat(doc.rootDoc().getField("pre_aggregated"), notNullValue());
}
public void testParseArrayValue() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().startArray("pre_aggregated")
.startObject()
.field("counts", new int[] {2, 2, 3})
.field("values", new double[] {2, 2, 3})
.endObject()
.startObject()
.field("counts", new int[] {2, 2, 3})
.field("values", new double[] {2, 2, 3})
.endObject().endArray()
.endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString("doesn't not support indexing multiple values " +
"for the same field in the same document"));
}
public void testEmptyArrays() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
ParsedDocument doc = defaultMapper.parse(new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("values", new double[] {})
.field("counts", new int[] {})
.endObject()
.endObject()),
XContentType.JSON));
assertThat(doc.rootDoc().getField("pre_aggregated"), notNullValue());
}
public void testNullValue() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
ParsedDocument doc = defaultMapper.parse(new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().nullField("pre_aggregated")
.endObject()),
XContentType.JSON));
assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue());
}
public void testMissingFieldCounts() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("values", new double[] {2, 2})
.endObject()
.endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString("expected field called [counts]"));
}
public void testIgnoreMalformed() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram")
.field("ignore_malformed", true);
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
ParsedDocument doc = defaultMapper.parse(new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("values", new double[] {2, 2})
.endObject()
.endObject()),
XContentType.JSON));
assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue());
}
public void testIgnoreMalformedSkipsKeyword() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram")
.field("ignore_malformed", true)
.endObject().startObject("otherField").field("type", "keyword");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
ParsedDocument doc = defaultMapper.parse(new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated", "value")
.field("otherField","value")
.endObject()),
XContentType.JSON));
assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue());
assertThat(doc.rootDoc().getField("otherField"), notNullValue());
}
public void testIgnoreMalformedSkipsArray() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram")
.field("ignore_malformed", true)
.endObject().startObject("otherField").field("type", "keyword");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
ParsedDocument doc = defaultMapper.parse(new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated", new int[] {2, 2, 2})
.field("otherField","value")
.endObject()),
XContentType.JSON));
assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue());
assertThat(doc.rootDoc().getField("otherField"), notNullValue());
}
public void testIgnoreMalformedSkipsField() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram")
.field("ignore_malformed", true)
.endObject().startObject("otherField").field("type", "keyword");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
ParsedDocument doc = defaultMapper.parse(new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("values", new double[] {2, 2})
.field("typo", new double[] {2, 2})
.endObject()
.field("otherField","value")
.endObject()),
XContentType.JSON));
assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue());
assertThat(doc.rootDoc().getField("otherField"), notNullValue());
}
public void testIgnoreMalformedSkipsObjects() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram")
.field("ignore_malformed", true)
.endObject().startObject("otherField").field("type", "keyword");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
ParsedDocument doc = defaultMapper.parse(new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.startObject("values").field("values", new double[] {2, 2})
.startObject("otherData").startObject("more").field("toto", 1)
.endObject().endObject()
.endObject()
.field("counts", new double[] {2, 2})
.endObject()
.field("otherField","value")
.endObject()),
XContentType.JSON));
assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue());
assertThat(doc.rootDoc().getField("otherField"), notNullValue());
}
public void testIgnoreMalformedSkipsEmpty() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram")
.field("ignore_malformed", true)
.endObject().startObject("otherField").field("type", "keyword");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
ParsedDocument doc = defaultMapper.parse(new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject().endObject()
.field("otherField","value")
.endObject()),
XContentType.JSON));
assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue());
assertThat(doc.rootDoc().getField("otherField"), notNullValue());
}
public void testMissingFieldValues() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("counts", new int[] {2, 2})
.endObject()
.endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString("expected field called [values]"));
}
public void testUnknownField() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("counts", new int[] {2, 2})
.field("values", new double[] {2, 2})
.field("unknown", new double[] {2, 2})
.endObject()
.endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString("with unknown parameter [unknown]"));
}
public void testFieldArraysDifferentSize() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("counts", new int[] {2, 2})
.field("values", new double[] {2, 2, 3})
.endObject()
.endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString("expected same length from [values] and [counts] but got [3 != 2]"));
}
public void testFieldCountsNotArray() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("counts", "bah")
.field("values", new double[] {2, 2, 3})
.endObject()
.endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString("expecting token of type [START_ARRAY] but found [VALUE_STRING]"));
}
public void testFieldCountsStringArray() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("counts", new String[] {"4", "5", "6"})
.field("values", new double[] {2, 2, 3})
.endObject()
.endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString("expecting token of type [VALUE_NUMBER] but found [VALUE_STRING]"));
}
public void testFieldValuesStringArray() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("counts", new int[] {4, 5, 6})
.field("values", new String[] {"2", "2", "3"})
.endObject()
.endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString("expecting token of type [VALUE_NUMBER] but found [VALUE_STRING]"));
}
public void testFieldValuesNotArray() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("counts", new int[] {2, 2, 3})
.field("values", "bah")
.endObject()
.endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString("expecting token of type [START_ARRAY] but found [VALUE_STRING]"));
}
public void testCountIsLong() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("counts", new long[] {2, 2, Long.MAX_VALUE})
.field("values", new double[] {2 ,2 ,3})
.endObject()
.endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString(" out of range of int"));
}
public void testValuesNotInOrder() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated").startObject()
.field("counts", new int[] {2, 8, 4})
.field("values", new double[] {2 ,3 ,2})
.endObject()
.endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString(" values must be in increasing order, " +
"got [2.0] but previous value was [3.0]"));
}
public void testFieldNotObject() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().field("pre_aggregated", "bah")
.endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString("expecting token of type [START_OBJECT] " +
"but found [VALUE_STRING]"));
}
public void testNegativeCount() throws Exception {
ensureGreen();
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("_doc")
.startObject("properties").startObject("pre_aggregated").field("type", "histogram");
String mapping = Strings.toString(xContentBuilder.endObject().endObject().endObject().endObject());
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser()
.parse("_doc", new CompressedXContent(mapping));
SourceToParse source = new SourceToParse("test", "_doc", "1",
BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject().startObject("pre_aggregated")
.field("counts", new int[] {2, 2, -3})
.field("values", new double[] {2, 2, 3})
.endObject().endObject()),
XContentType.JSON);
Exception e = expectThrows(MapperParsingException.class, () -> defaultMapper.parse(source));
assertThat(e.getCause().getMessage(), containsString("[counts] elements must be >= 0 but got -3"));
}
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins());
plugins.add(AnalyticsPlugin.class);
plugins.add(XPackPlugin.class);
return plugins;
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
import org.elasticsearch.index.mapper.FieldTypeTestCase;
import org.elasticsearch.index.mapper.MappedFieldType;
public class HistogramFieldTypeTests extends FieldTypeTestCase {
@Override
protected MappedFieldType createDefaultFieldType() {
return new HistogramFieldMapper.HistogramFieldType();
}
}

View File

@ -0,0 +1,237 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
import com.tdunning.math.stats.Centroid;
import org.HdrHistogram.DoubleHistogram;
import org.HdrHistogram.DoubleHistogramIterationValue;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentiles;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesMethod;
import org.elasticsearch.search.aggregations.metrics.TDigestState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.elasticsearch.xpack.core.XPackPlugin;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
public class HistogramPercentileAggregationTests extends ESSingleNodeTestCase {
public void testHDRHistogram() throws Exception {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("_doc")
.startObject("properties")
.startObject("data")
.field("type", "double")
.endObject()
.endObject()
.endObject()
.endObject();
createIndex("raw");
PutMappingRequest request = new PutMappingRequest("raw").type("_doc").source(xContentBuilder);
client().admin().indices().putMapping(request).actionGet();
XContentBuilder xContentBuilder2 = XContentFactory.jsonBuilder()
.startObject()
.startObject("_doc")
.startObject("properties")
.startObject("data")
.field("type", "histogram")
.endObject()
.endObject()
.endObject()
.endObject();
createIndex("pre_agg");
PutMappingRequest request2 = new PutMappingRequest("pre_agg").type("_doc").source(xContentBuilder2);
client().admin().indices().putMapping(request2).actionGet();
int numberOfSignificantValueDigits = TestUtil.nextInt(random(), 1, 5);
DoubleHistogram histogram = new DoubleHistogram(numberOfSignificantValueDigits);
BulkRequest bulkRequest = new BulkRequest();
int numDocs = 10000;
int frq = 1000;
for (int i =0; i < numDocs; i ++) {
double value = random().nextDouble();
XContentBuilder doc = XContentFactory.jsonBuilder()
.startObject()
.field("data", value)
.endObject();
bulkRequest.add(new IndexRequest("raw").source(doc));
histogram.recordValue(value);
if ((i + 1) % frq == 0) {
client().bulk(bulkRequest);
bulkRequest = new BulkRequest();
List<Double> values = new ArrayList<>();
List<Integer> counts = new ArrayList<>();
Iterator<DoubleHistogramIterationValue> iterator = histogram.recordedValues().iterator();
while (iterator.hasNext()) {
DoubleHistogramIterationValue histValue = iterator.next();
values.add(histValue.getValueIteratedTo());
counts.add(Math.toIntExact(histValue.getCountAtValueIteratedTo()));
}
XContentBuilder preAggDoc = XContentFactory.jsonBuilder()
.startObject()
.startObject("data")
.field("values", values.toArray(new Double[values.size()]))
.field("counts", counts.toArray(new Integer[counts.size()]))
.endObject()
.endObject();
client().prepareIndex("pre_agg", "_doc").setSource(preAggDoc).get();
histogram.reset();
}
}
client().admin().indices().refresh(new RefreshRequest("raw", "pre_agg")).get();
SearchResponse response = client().prepareSearch("raw").setTrackTotalHits(true).get();
assertEquals(numDocs, response.getHits().getTotalHits().value);
response = client().prepareSearch("pre_agg").get();
assertEquals(numDocs / frq, response.getHits().getTotalHits().value);
PercentilesAggregationBuilder builder =
AggregationBuilders.percentiles("agg").field("data").method(PercentilesMethod.HDR)
.numberOfSignificantValueDigits(numberOfSignificantValueDigits).percentiles(10);
SearchResponse responseRaw = client().prepareSearch("raw").addAggregation(builder).get();
SearchResponse responsePreAgg = client().prepareSearch("pre_agg").addAggregation(builder).get();
SearchResponse responseBoth = client().prepareSearch("pre_agg", "raw").addAggregation(builder).get();
InternalHDRPercentiles percentilesRaw = responseRaw.getAggregations().get("agg");
InternalHDRPercentiles percentilesPreAgg = responsePreAgg.getAggregations().get("agg");
InternalHDRPercentiles percentilesBoth = responseBoth.getAggregations().get("agg");
for (int i = 1; i < 100; i++) {
assertEquals(percentilesRaw.percentile(i), percentilesPreAgg.percentile(i), 0.0);
assertEquals(percentilesRaw.percentile(i), percentilesBoth.percentile(i), 0.0);
}
}
public void testTDigestHistogram() throws Exception {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("_doc")
.startObject("properties")
.startObject("data")
.field("type", "double")
.endObject()
.endObject()
.endObject()
.endObject();
createIndex("raw");
PutMappingRequest request = new PutMappingRequest("raw").type("_doc").source(xContentBuilder);
client().admin().indices().putMapping(request).actionGet();
XContentBuilder xContentBuilder2 = XContentFactory.jsonBuilder()
.startObject()
.startObject("_doc")
.startObject("properties")
.startObject("data")
.field("type", "histogram")
.endObject()
.endObject()
.endObject()
.endObject();
createIndex("pre_agg");
PutMappingRequest request2 = new PutMappingRequest("pre_agg").type("_doc").source(xContentBuilder2);
client().admin().indices().putMapping(request2).actionGet();
int compression = TestUtil.nextInt(random(), 25, 300);
TDigestState histogram = new TDigestState(compression);
BulkRequest bulkRequest = new BulkRequest();
int numDocs = 10000;
int frq = 1000;
for (int i =0; i < numDocs; i ++) {
double value = random().nextDouble();
XContentBuilder doc = XContentFactory.jsonBuilder()
.startObject()
.field("data", value)
.endObject();
bulkRequest.add(new IndexRequest("raw").source(doc));
histogram.add(value);
if ((i + 1) % frq == 0) {
client().bulk(bulkRequest);
bulkRequest = new BulkRequest();
List<Double> values = new ArrayList<>();
List<Integer> counts = new ArrayList<>();
Collection<Centroid> centroids = histogram.centroids();
for (Centroid centroid : centroids) {
values.add(centroid.mean());
counts.add(centroid.count());
}
XContentBuilder preAggDoc = XContentFactory.jsonBuilder()
.startObject()
.startObject("data")
.field("values", values.toArray(new Double[values.size()]))
.field("counts", counts.toArray(new Integer[counts.size()]))
.endObject()
.endObject();
client().prepareIndex("pre_agg", "_doc").setSource(preAggDoc).get();
histogram = new TDigestState(compression);
}
}
client().admin().indices().refresh(new RefreshRequest("raw", "pre_agg")).get();
SearchResponse response = client().prepareSearch("raw").setTrackTotalHits(true).get();
assertEquals(numDocs, response.getHits().getTotalHits().value);
response = client().prepareSearch("pre_agg").get();
assertEquals(numDocs / frq, response.getHits().getTotalHits().value);
PercentilesAggregationBuilder builder =
AggregationBuilders.percentiles("agg").field("data").method(PercentilesMethod.TDIGEST)
.compression(compression).percentiles(10, 25, 500, 75);
SearchResponse responseRaw = client().prepareSearch("raw").addAggregation(builder).get();
SearchResponse responsePreAgg = client().prepareSearch("pre_agg").addAggregation(builder).get();
SearchResponse responseBoth = client().prepareSearch("raw", "pre_agg").addAggregation(builder).get();
InternalTDigestPercentiles percentilesRaw = responseRaw.getAggregations().get("agg");
InternalTDigestPercentiles percentilesPreAgg = responsePreAgg.getAggregations().get("agg");
InternalTDigestPercentiles percentilesBoth = responseBoth.getAggregations().get("agg");
for (int i = 1; i < 100; i++) {
assertEquals(percentilesRaw.percentile(i), percentilesPreAgg.percentile(i), 1e-2);
assertEquals(percentilesRaw.percentile(i), percentilesBoth.percentile(i), 1e-2);
}
}
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins());
plugins.add(AnalyticsPlugin.class);
plugins.add(XPackPlugin.class);
return plugins;
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
import com.tdunning.math.stats.Centroid;
import com.tdunning.math.stats.TDigest;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.Percentile;
import org.elasticsearch.search.aggregations.metrics.PercentileRanks;
import org.elasticsearch.search.aggregations.metrics.PercentileRanksAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesMethod;
import org.elasticsearch.search.aggregations.metrics.TDigestState;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
public class TDigestPreAggregatedPercentileRanksAggregatorTests extends AggregatorTestCase {
private BinaryDocValuesField getDocValue(String fieldName, double[] values) throws IOException {
TDigest histogram = new TDigestState(100.0); //default
for (double value : values) {
histogram.add(value);
}
BytesStreamOutput streamOutput = new BytesStreamOutput();
histogram.compress();
Collection<Centroid> centroids = histogram.centroids();
Iterator<Centroid> iterator = centroids.iterator();
while ( iterator.hasNext()) {
Centroid centroid = iterator.next();
streamOutput.writeVInt(centroid.count());
streamOutput.writeDouble(centroid.mean());
}
return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef());
}
public void testSimple() throws IOException {
try (Directory dir = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {
Document doc = new Document();
doc.add(getDocValue("field", new double[] {3, 0.2, 10}));
w.addDocument(doc);
PercentileRanksAggregationBuilder aggBuilder = new PercentileRanksAggregationBuilder("my_agg", new double[] {0.1, 0.5, 12})
.field("field")
.method(PercentilesMethod.TDIGEST);
MappedFieldType fieldType = new HistogramFieldMapper.Builder("number").fieldType();
fieldType.setName("field");
try (IndexReader reader = w.getReader()) {
IndexSearcher searcher = new IndexSearcher(reader);
PercentileRanks ranks = search(searcher, new MatchAllDocsQuery(), aggBuilder, fieldType);
Iterator<Percentile> rankIterator = ranks.iterator();
Percentile rank = rankIterator.next();
assertEquals(0.1, rank.getValue(), 0d);
// TODO: Fix T-Digest: this assertion should pass but we currently get ~15
// https://github.com/elastic/elasticsearch/issues/14851
// assertThat(rank.getPercent(), Matchers.equalTo(0d));
rank = rankIterator.next();
assertEquals(0.5, rank.getValue(), 0d);
assertThat(rank.getPercent(), Matchers.greaterThan(0d));
assertThat(rank.getPercent(), Matchers.lessThan(100d));
rank = rankIterator.next();
assertEquals(12, rank.getValue(), 0d);
// TODO: Fix T-Digest: this assertion should pass but we currently get ~59
// https://github.com/elastic/elasticsearch/issues/14851
// assertThat(rank.getPercent(), Matchers.equalTo(100d));
assertFalse(rankIterator.hasNext());
assertTrue(AggregationInspectionHelper.hasValue(((InternalTDigestPercentileRanks)ranks)));
}
}
}
}

View File

@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.mapper;
import com.tdunning.math.stats.Centroid;
import com.tdunning.math.stats.TDigest;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesMethod;
import org.elasticsearch.search.aggregations.metrics.TDigestState;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.function.Consumer;
import static java.util.Collections.singleton;
public class TDigestPreAggregatedPercentilesAggregatorTests extends AggregatorTestCase {
private BinaryDocValuesField getDocValue(String fieldName, double[] values) throws IOException {
TDigest histogram = new TDigestState(100.0); //default
for (double value : values) {
histogram.add(value);
}
BytesStreamOutput streamOutput = new BytesStreamOutput();
histogram.compress();
Collection<Centroid> centroids = histogram.centroids();
Iterator<Centroid> iterator = centroids.iterator();
while ( iterator.hasNext()) {
Centroid centroid = iterator.next();
streamOutput.writeVInt(centroid.count());
streamOutput.writeDouble(centroid.mean());
}
return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef());
}
public void testNoMatchingField() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
iw.addDocument(singleton(getDocValue("wrong_number", new double[]{7, 1})));
}, hdr -> {
//assertEquals(0L, hdr.state.getTotalCount());
assertFalse(AggregationInspectionHelper.hasValue(hdr));
});
}
public void testEmptyField() throws IOException {
testCase(new MatchAllDocsQuery(), iw -> {
iw.addDocument(singleton(getDocValue("number", new double[0])));
}, hdr -> {
assertFalse(AggregationInspectionHelper.hasValue(hdr));
});
}
public void testSomeMatchesBinaryDocValues() throws IOException {
testCase(new DocValuesFieldExistsQuery("number"), iw -> {
iw.addDocument(singleton(getDocValue("number", new double[]{60, 40, 20, 10})));
}, hdr -> {
//assertEquals(4L, hdr.state.getTotalCount());
double approximation = 0.05d;
assertEquals(15.0d, hdr.percentile(25), approximation);
assertEquals(30.0d, hdr.percentile(50), approximation);
assertEquals(50.0d, hdr.percentile(75), approximation);
assertEquals(60.0d, hdr.percentile(99), approximation);
assertTrue(AggregationInspectionHelper.hasValue(hdr));
});
}
public void testSomeMatchesMultiBinaryDocValues() throws IOException {
testCase(new DocValuesFieldExistsQuery("number"), iw -> {
iw.addDocument(singleton(getDocValue("number", new double[]{60, 40, 20, 10})));
iw.addDocument(singleton(getDocValue("number", new double[]{60, 40, 20, 10})));
iw.addDocument(singleton(getDocValue("number", new double[]{60, 40, 20, 10})));
iw.addDocument(singleton(getDocValue("number", new double[]{60, 40, 20, 10})));
}, hdr -> {
//assertEquals(16L, hdr.state.getTotalCount());
double approximation = 0.05d;
assertEquals(15.0d, hdr.percentile(25), approximation);
assertEquals(30.0d, hdr.percentile(50), approximation);
assertEquals(50.0d, hdr.percentile(75), approximation);
assertEquals(60.0d, hdr.percentile(99), approximation);
assertTrue(AggregationInspectionHelper.hasValue(hdr));
});
}
private void testCase(Query query, CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<InternalTDigestPercentiles> verify) throws IOException {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
buildIndex.accept(indexWriter);
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
PercentilesAggregationBuilder builder =
new PercentilesAggregationBuilder("test").field("number").method(PercentilesMethod.TDIGEST);
MappedFieldType fieldType = new HistogramFieldMapper.Builder("number").fieldType();
fieldType.setName("number");
Aggregator aggregator = createAggregator(builder, indexSearcher, fieldType);
aggregator.preCollection();
indexSearcher.search(query, aggregator);
aggregator.postCollection();
verify.accept((InternalTDigestPercentiles) aggregator.buildAggregation(0L));
}
}
}
}