Promote longs to doubles when a terms agg mixes decimal and non-decimal numbers (#22449)
* Promote longs to doubles when a terms agg mixes decimal and non-decimal number This change makes the terms aggregation work when the buckets coming from different indices are a mix of decimal numbers and non-decimal numbers. In this case non-decimal number (longs) are promoted to decimal (double) which can result in a loss of precision for big numbers. Fixes #22232
This commit is contained in:
parent
cb2333dacd
commit
433c822d4f
|
@ -22,10 +22,12 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -152,4 +154,32 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
|
|||
protected Bucket[] createBucketsArray(int size) {
|
||||
return new Bucket[size];
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
||||
boolean promoteToDouble = false;
|
||||
for (InternalAggregation agg : aggregations) {
|
||||
if (agg instanceof LongTerms && ((LongTerms) agg).format == DocValueFormat.RAW) {
|
||||
/**
|
||||
* this terms agg mixes longs and doubles, we must promote longs to doubles to make the internal aggs
|
||||
* compatible
|
||||
*/
|
||||
promoteToDouble = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (promoteToDouble == false) {
|
||||
return super.doReduce(aggregations, reduceContext);
|
||||
}
|
||||
List<InternalAggregation> newAggs = new ArrayList<>();
|
||||
for (InternalAggregation agg : aggregations) {
|
||||
if (agg instanceof LongTerms) {
|
||||
DoubleTerms dTerms = LongTerms.convertLongTermsToDouble((LongTerms) agg, format);
|
||||
newAggs.add(dTerms);
|
||||
} else {
|
||||
newAggs.add(agg);
|
||||
}
|
||||
}
|
||||
return newAggs.get(0).doReduce(newAggs, reduceContext);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,10 +22,12 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -152,4 +154,32 @@ public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket>
|
|||
protected Bucket[] createBucketsArray(int size) {
|
||||
return new Bucket[size];
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
||||
for (InternalAggregation agg : aggregations) {
|
||||
if (agg instanceof DoubleTerms) {
|
||||
return agg.doReduce(aggregations, reduceContext);
|
||||
}
|
||||
}
|
||||
return super.doReduce(aggregations, reduceContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a {@link LongTerms} into a {@link DoubleTerms}, returning the value of the specified long terms as doubles.
|
||||
*/
|
||||
static DoubleTerms convertLongTermsToDouble(LongTerms longTerms, DocValueFormat decimalFormat) {
|
||||
List<Terms.Bucket> buckets = longTerms.getBuckets();
|
||||
List<DoubleTerms.Bucket> newBuckets = new ArrayList<>();
|
||||
for (Terms.Bucket bucket : buckets) {
|
||||
newBuckets.add(new DoubleTerms.Bucket(bucket.getKeyAsNumber().doubleValue(),
|
||||
bucket.getDocCount(), (InternalAggregations) bucket.getAggregations(), longTerms.showTermDocCountError,
|
||||
longTerms.showTermDocCountError ? bucket.getDocCountError() : 0, decimalFormat));
|
||||
}
|
||||
return new DoubleTerms(longTerms.getName(), longTerms.order, longTerms.requiredSize,
|
||||
longTerms.minDocCount, longTerms.pipelineAggregators(),
|
||||
longTerms.metaData, longTerms.format, longTerms.shardSize,
|
||||
longTerms.showTermDocCountError, longTerms.otherDocCount,
|
||||
newBuckets, longTerms.docCountError);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.search.aggregations.bucket.terms;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.SortedNumericDocValuesField;
|
||||
import org.apache.lucene.document.SortedSetDocValuesField;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
|
@ -27,11 +28,21 @@ import org.apache.lucene.search.IndexSearcher;
|
|||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.NumericUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.index.mapper.KeywordFieldMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.NumberFieldMapper;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class TermsAggregatorTests extends AggregatorTestCase {
|
||||
|
||||
public void testTermsAggregator() throws Exception {
|
||||
|
@ -83,4 +94,108 @@ public class TermsAggregatorTests extends AggregatorTestCase {
|
|||
directory.close();
|
||||
}
|
||||
|
||||
public void testMixLongAndDouble() throws IOException {
|
||||
for (TermsAggregatorFactory.ExecutionMode executionMode : TermsAggregatorFactory.ExecutionMode.values()) {
|
||||
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name", ValueType.LONG)
|
||||
.executionHint(executionMode.toString())
|
||||
.field("number")
|
||||
.order(Terms.Order.term(true));
|
||||
List<InternalAggregation> aggs = new ArrayList<> ();
|
||||
int numLongs = randomIntBetween(1, 3);
|
||||
for (int i = 0; i < numLongs; i++) {
|
||||
final Directory dir;
|
||||
try (IndexReader reader = createIndexWithLongs()) {
|
||||
dir = ((DirectoryReader) reader).directory();
|
||||
IndexSearcher searcher = new IndexSearcher(reader);
|
||||
MappedFieldType fieldType =
|
||||
new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
|
||||
fieldType.setName("number");
|
||||
fieldType.setHasDocValues(true);
|
||||
aggs.add(buildInternalAggregation(aggregationBuilder, fieldType, searcher));
|
||||
}
|
||||
dir.close();
|
||||
}
|
||||
int numDoubles = randomIntBetween(1, 3);
|
||||
for (int i = 0; i < numDoubles; i++) {
|
||||
final Directory dir;
|
||||
try (IndexReader reader = createIndexWithDoubles()) {
|
||||
dir = ((DirectoryReader) reader).directory();
|
||||
IndexSearcher searcher = new IndexSearcher(reader);
|
||||
MappedFieldType fieldType =
|
||||
new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.DOUBLE);
|
||||
fieldType.setName("number");
|
||||
fieldType.setHasDocValues(true);
|
||||
aggs.add(buildInternalAggregation(aggregationBuilder, fieldType, searcher));
|
||||
}
|
||||
dir.close();
|
||||
}
|
||||
InternalAggregation.ReduceContext ctx =
|
||||
new InternalAggregation.ReduceContext(new MockBigArrays(Settings.EMPTY,
|
||||
new NoneCircuitBreakerService()), null);
|
||||
for (InternalAggregation internalAgg : aggs) {
|
||||
InternalAggregation mergedAggs = internalAgg.doReduce(aggs, ctx);
|
||||
assertTrue(mergedAggs instanceof DoubleTerms);
|
||||
long expected = numLongs + numDoubles;
|
||||
List<Terms.Bucket> buckets = ((DoubleTerms) mergedAggs).getBuckets();
|
||||
assertEquals(4, buckets.size());
|
||||
assertEquals("1.0", buckets.get(0).getKeyAsString());
|
||||
assertEquals(expected, buckets.get(0).getDocCount());
|
||||
assertEquals("10.0", buckets.get(1).getKeyAsString());
|
||||
assertEquals(expected * 2, buckets.get(1).getDocCount());
|
||||
assertEquals("100.0", buckets.get(2).getKeyAsString());
|
||||
assertEquals(expected * 2, buckets.get(2).getDocCount());
|
||||
assertEquals("1000.0", buckets.get(3).getKeyAsString());
|
||||
assertEquals(expected, buckets.get(3).getDocCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private IndexReader createIndexWithLongs() throws IOException {
|
||||
Directory directory = newDirectory();
|
||||
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
|
||||
Document document = new Document();
|
||||
document.add(new SortedNumericDocValuesField("number", 10));
|
||||
document.add(new SortedNumericDocValuesField("number", 100));
|
||||
indexWriter.addDocument(document);
|
||||
document = new Document();
|
||||
document.add(new SortedNumericDocValuesField("number", 1));
|
||||
document.add(new SortedNumericDocValuesField("number", 100));
|
||||
indexWriter.addDocument(document);
|
||||
document = new Document();
|
||||
document.add(new SortedNumericDocValuesField("number", 10));
|
||||
document.add(new SortedNumericDocValuesField("number", 1000));
|
||||
indexWriter.addDocument(document);
|
||||
indexWriter.close();
|
||||
return DirectoryReader.open(directory);
|
||||
}
|
||||
|
||||
private IndexReader createIndexWithDoubles() throws IOException {
|
||||
Directory directory = newDirectory();
|
||||
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
|
||||
Document document = new Document();
|
||||
document.add(new SortedNumericDocValuesField("number", NumericUtils.doubleToSortableLong(10.0d)));
|
||||
document.add(new SortedNumericDocValuesField("number", NumericUtils.doubleToSortableLong(100.0d)));
|
||||
indexWriter.addDocument(document);
|
||||
document = new Document();
|
||||
document.add(new SortedNumericDocValuesField("number", NumericUtils.doubleToSortableLong(1.0d)));
|
||||
document.add(new SortedNumericDocValuesField("number", NumericUtils.doubleToSortableLong(100.0d)));
|
||||
indexWriter.addDocument(document);
|
||||
document = new Document();
|
||||
document.add(new SortedNumericDocValuesField("number", NumericUtils.doubleToSortableLong(10.0d)));
|
||||
document.add(new SortedNumericDocValuesField("number", NumericUtils.doubleToSortableLong(1000.0d)));
|
||||
indexWriter.addDocument(document);
|
||||
indexWriter.close();
|
||||
return DirectoryReader.open(directory);
|
||||
}
|
||||
|
||||
private InternalAggregation buildInternalAggregation(TermsAggregationBuilder builder, MappedFieldType fieldType,
|
||||
IndexSearcher searcher) throws IOException {
|
||||
try (TermsAggregator aggregator = createAggregator(builder, fieldType, searcher)) {
|
||||
aggregator.preCollection();
|
||||
searcher.search(new MatchAllDocsQuery(), aggregator);
|
||||
aggregator.postCollection();
|
||||
return aggregator.buildAggregation(0L);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -794,3 +794,10 @@ had a value.
|
|||
--------------------------------------------------
|
||||
|
||||
<1> Documents without a value in the `tags` field will fall into the same bucket as documents that have the value `N/A`.
|
||||
|
||||
==== Mixing field types
|
||||
|
||||
WARNING: When aggregating on multiple indices the type of the aggregated field may not be the same in all indices.
|
||||
Some types are compatible with each other (`integer` and `long` or `float` and `double`) but when the types are a mix
|
||||
of decimal and non-decimal number the terms aggregation will promote the non-decimal numbers to decimal numbers.
|
||||
This can result in a loss of precision in the bucket values.
|
||||
|
|
|
@ -18,12 +18,26 @@ setup:
|
|||
type: long
|
||||
double:
|
||||
type: double
|
||||
number:
|
||||
type: long
|
||||
scaled_float:
|
||||
type: scaled_float
|
||||
scaling_factor: 100
|
||||
date:
|
||||
type: date
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: test_2
|
||||
body:
|
||||
settings:
|
||||
number_of_replicas: 0
|
||||
mappings:
|
||||
test:
|
||||
properties:
|
||||
number:
|
||||
type: double
|
||||
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: green
|
||||
|
@ -143,6 +157,7 @@ setup:
|
|||
- do:
|
||||
catch: request
|
||||
search:
|
||||
index: test_1
|
||||
body: { "size" : 0, "aggs" : { "ip_terms" : { "terms" : { "field" : "ip", "exclude" : "127.*" } } } }
|
||||
|
||||
|
||||
|
@ -667,3 +682,68 @@ setup:
|
|||
- match: { aggregations.double_terms.buckets.0.key: 3.5 }
|
||||
|
||||
- match: { aggregations.double_terms.buckets.0.doc_count: 1 }
|
||||
|
||||
---
|
||||
"Mixing longs and doubles":
|
||||
|
||||
- skip:
|
||||
version: " - 5.99.99"
|
||||
reason: in 6.0 longs and doubles are compatible within a terms agg (longs are promoted to doubles)
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test_1
|
||||
type: test
|
||||
id: 1
|
||||
body: {"number": 100}
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test_1
|
||||
type: test
|
||||
id: 2
|
||||
body: {"number": 10}
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test_2
|
||||
type: test
|
||||
id: 3
|
||||
body: {"number": 100.0}
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test_2
|
||||
type: test
|
||||
id: 1
|
||||
body: {"number": 10.0}
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test_2
|
||||
type: test
|
||||
id: 2
|
||||
body: {"number": 14.6}
|
||||
|
||||
- do:
|
||||
indices.refresh: {}
|
||||
|
||||
- do:
|
||||
search:
|
||||
body: { "size" : 0, "aggs" : { "number_terms" : { "terms" : { "field" : "number" } } } }
|
||||
|
||||
- match: { hits.total: 5 }
|
||||
|
||||
- length: { aggregations.number_terms.buckets: 3 }
|
||||
|
||||
- match: { aggregations.number_terms.buckets.0.key: 10.0 }
|
||||
|
||||
- match: { aggregations.number_terms.buckets.0.doc_count: 2 }
|
||||
|
||||
- match: { aggregations.number_terms.buckets.1.key: 100.0 }
|
||||
|
||||
- match: { aggregations.number_terms.buckets.1.doc_count: 2 }
|
||||
|
||||
- match: { aggregations.number_terms.buckets.2.key: 14.6 }
|
||||
|
||||
- match: { aggregations.number_terms.buckets.2.doc_count: 1 }
|
||||
|
|
Loading…
Reference in New Issue