mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
Aggregations: Remove ordinals
execution hint.
This was how terms aggregations managed to not be too slow initially by caching reads into the terms dictionary using ordinals. However, this doesn't behave nicely on high-cardinality fields since the reads into the terms dict are random and this execution mode loads all unique terms into memory. The `global_ordinals` execution mode (default since 1.2) is expected to be better in all cases. Close #6499
This commit is contained in:
parent
fbd7c9aa5d
commit
232394e3a8
@ -18,14 +18,9 @@
|
||||
*/
|
||||
package org.elasticsearch.search.aggregations.bucket.significant;
|
||||
|
||||
import org.apache.lucene.index.AtomicReaderContext;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.LongArray;
|
||||
import org.elasticsearch.index.fielddata.BytesValues;
|
||||
import org.elasticsearch.index.fielddata.ordinals.Ordinals;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.StringTermsAggregator;
|
||||
@ -119,71 +114,5 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
|
||||
Releasables.close(bucketOrds, termsAggFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension of SignificantStringTermsAggregator that caches bucket ords using terms ordinals.
|
||||
*/
|
||||
public static class WithOrdinals extends SignificantStringTermsAggregator {
|
||||
|
||||
private final ValuesSource.Bytes.WithOrdinals valuesSource;
|
||||
private BytesValues.WithOrdinals bytesValues;
|
||||
private Ordinals.Docs ordinals;
|
||||
private LongArray ordinalToBucket;
|
||||
|
||||
public WithOrdinals(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals valuesSource,
|
||||
long esitmatedBucketCount, BucketCountThresholds bucketCountThresholds, AggregationContext aggregationContext,
|
||||
Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {
|
||||
super(name, factories, valuesSource, esitmatedBucketCount, bucketCountThresholds, null, aggregationContext, parent, termsAggFactory);
|
||||
this.valuesSource = valuesSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNextReader(AtomicReaderContext reader) {
|
||||
bytesValues = valuesSource.bytesValues();
|
||||
ordinals = bytesValues.ordinals();
|
||||
final long maxOrd = ordinals.getMaxOrd();
|
||||
if (ordinalToBucket == null || ordinalToBucket.size() < maxOrd) {
|
||||
if (ordinalToBucket != null) {
|
||||
ordinalToBucket.close();
|
||||
}
|
||||
ordinalToBucket = context().bigArrays().newLongArray(BigArrays.overSize(maxOrd), false);
|
||||
}
|
||||
ordinalToBucket.fill(0, maxOrd, -1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int doc, long owningBucketOrdinal) throws IOException {
|
||||
assert owningBucketOrdinal == 0 : "this is a per_bucket aggregator";
|
||||
numCollectedDocs++;
|
||||
final int valuesCount = ordinals.setDocument(doc);
|
||||
|
||||
for (int i = 0; i < valuesCount; ++i) {
|
||||
final long ord = ordinals.nextOrd();
|
||||
long bucketOrd = ordinalToBucket.get(ord);
|
||||
if (bucketOrd < 0) { // unlikely condition on a low-cardinality field
|
||||
final BytesRef bytes = bytesValues.getValueByOrd(ord);
|
||||
final int hash = bytesValues.currentValueHash();
|
||||
assert hash == bytes.hashCode();
|
||||
bucketOrd = bucketOrds.add(bytes, hash);
|
||||
if (bucketOrd < 0) { // already seen in another segment
|
||||
bucketOrd = -1 - bucketOrd;
|
||||
collectExistingBucket(doc, bucketOrd);
|
||||
} else {
|
||||
collectBucket(doc, bucketOrd);
|
||||
}
|
||||
ordinalToBucket.set(ord, bucketOrd);
|
||||
} else {
|
||||
collectExistingBucket(doc, bucketOrd);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doClose() {
|
||||
Releasables.close(bucketOrds, termsAggFactory, ordinalToBucket);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -63,24 +63,6 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
||||
return false;
|
||||
}
|
||||
|
||||
},
|
||||
ORDINALS(new ParseField("ordinals")) {
|
||||
|
||||
@Override
|
||||
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
|
||||
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
|
||||
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) {
|
||||
if (includeExclude != null) {
|
||||
return MAP.create(name, factories, valuesSource, estimatedBucketCount, bucketCountThresholds, includeExclude, aggregationContext, parent, termsAggregatorFactory);
|
||||
}
|
||||
return new SignificantStringTermsAggregator.WithOrdinals(name, factories, (ValuesSource.Bytes.WithOrdinals) valuesSource, estimatedBucketCount, bucketCountThresholds, aggregationContext, parent, termsAggregatorFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean needsGlobalOrdinals() {
|
||||
return false;
|
||||
}
|
||||
|
||||
},
|
||||
GLOBAL_ORDINALS(new ParseField("global_ordinals")) {
|
||||
|
||||
|
@ -26,9 +26,7 @@ import org.apache.lucene.index.AtomicReaderContext;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.collect.Iterators2;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.BytesRefHash;
|
||||
import org.elasticsearch.common.util.LongArray;
|
||||
import org.elasticsearch.index.fielddata.BytesValues;
|
||||
import org.elasticsearch.index.fielddata.ordinals.Ordinals;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
@ -257,67 +255,5 @@ public class StringTermsAggregator extends AbstractStringTermsAggregator {
|
||||
Releasables.close(bucketOrds);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension of StringTermsAggregator that caches bucket ords using terms ordinals.
|
||||
*/
|
||||
public static class WithOrdinals extends StringTermsAggregator {
|
||||
|
||||
private final ValuesSource.Bytes.WithOrdinals valuesSource;
|
||||
private BytesValues.WithOrdinals bytesValues;
|
||||
private Ordinals.Docs ordinals;
|
||||
private LongArray ordinalToBucket;
|
||||
|
||||
public WithOrdinals(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals valuesSource, long esitmatedBucketCount,
|
||||
InternalOrder order, BucketCountThresholds bucketCountThresholds, AggregationContext aggregationContext, Aggregator parent, SubAggCollectionMode collectionMode) {
|
||||
super(name, factories, valuesSource, esitmatedBucketCount, order, bucketCountThresholds, null, aggregationContext, parent, collectionMode);
|
||||
this.valuesSource = valuesSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNextReader(AtomicReaderContext reader) {
|
||||
bytesValues = valuesSource.bytesValues();
|
||||
ordinals = bytesValues.ordinals();
|
||||
final long maxOrd = ordinals.getMaxOrd();
|
||||
if (ordinalToBucket == null || ordinalToBucket.size() < maxOrd) {
|
||||
if (ordinalToBucket != null) {
|
||||
ordinalToBucket.close();
|
||||
}
|
||||
ordinalToBucket = context().bigArrays().newLongArray(BigArrays.overSize(maxOrd), false);
|
||||
}
|
||||
ordinalToBucket.fill(0, maxOrd, -1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int doc, long owningBucketOrdinal) throws IOException {
|
||||
assert owningBucketOrdinal == 0 : "this is a per_bucket aggregator";
|
||||
final int valuesCount = ordinals.setDocument(doc);
|
||||
|
||||
for (int i = 0; i < valuesCount; ++i) {
|
||||
final long ord = ordinals.nextOrd();
|
||||
long bucketOrd = ordinalToBucket.get(ord);
|
||||
if (bucketOrd < 0) { // unlikely condition on a low-cardinality field
|
||||
final BytesRef bytes = bytesValues.getValueByOrd(ord);
|
||||
final int hash = bytesValues.currentValueHash();
|
||||
assert hash == bytes.hashCode();
|
||||
bucketOrd = bucketOrds.add(bytes, hash);
|
||||
if (bucketOrd < 0) { // already seen in another segment
|
||||
bucketOrd = - 1 - bucketOrd;
|
||||
collectExistingBucket(doc, bucketOrd);
|
||||
} else {
|
||||
collectBucket(doc, bucketOrd);
|
||||
}
|
||||
ordinalToBucket.set(ord, bucketOrd);
|
||||
} else {
|
||||
collectExistingBucket(doc, bucketOrd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doClose() {
|
||||
Releasables.close(bucketOrds, ordinalToBucket);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -50,24 +50,6 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
|
||||
return false;
|
||||
}
|
||||
|
||||
},
|
||||
ORDINALS(new ParseField("ordinals")) {
|
||||
|
||||
@Override
|
||||
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
|
||||
long maxOrd, InternalOrder order, TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
|
||||
AggregationContext aggregationContext, Aggregator parent, SubAggCollectionMode subAggCollectMode) {
|
||||
if (includeExclude != null) {
|
||||
return MAP.create(name, factories, valuesSource, estimatedBucketCount, maxOrd, order, bucketCountThresholds, includeExclude, aggregationContext, parent, subAggCollectMode);
|
||||
}
|
||||
return new StringTermsAggregator.WithOrdinals(name, factories, (ValuesSource.Bytes.WithOrdinals) valuesSource, estimatedBucketCount, order, bucketCountThresholds, aggregationContext, parent, subAggCollectMode);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean needsGlobalOrdinals() {
|
||||
return false;
|
||||
}
|
||||
|
||||
},
|
||||
GLOBAL_ORDINALS(new ParseField("global_ordinals")) {
|
||||
|
||||
|
@ -219,7 +219,6 @@ public class RandomTests extends ElasticsearchIntegrationTest {
|
||||
.addAggregation(terms("long").field("long_values").size(maxNumTerms).collectMode(randomFrom(SubAggCollectionMode.values())).subAggregation(min("min").field("num")))
|
||||
.addAggregation(terms("double").field("double_values").size(maxNumTerms).collectMode(randomFrom(SubAggCollectionMode.values())).subAggregation(max("max").field("num")))
|
||||
.addAggregation(terms("string_map").field("string_values").collectMode(randomFrom(SubAggCollectionMode.values())).executionHint(TermsAggregatorFactory.ExecutionMode.MAP.toString()).size(maxNumTerms).subAggregation(stats("stats").field("num")))
|
||||
.addAggregation(terms("string_ordinals").field("string_values").collectMode(randomFrom(SubAggCollectionMode.values())).executionHint(TermsAggregatorFactory.ExecutionMode.ORDINALS.toString()).size(maxNumTerms).subAggregation(extendedStats("stats").field("num")))
|
||||
.addAggregation(terms("string_global_ordinals").field("string_values").collectMode(randomFrom(SubAggCollectionMode.values())).executionHint(globalOrdinalModes[randomInt(globalOrdinalModes.length - 1)].toString()).size(maxNumTerms).subAggregation(extendedStats("stats").field("num")))
|
||||
.addAggregation(terms("string_global_ordinals_doc_values").field("string_values.doc_values").collectMode(randomFrom(SubAggCollectionMode.values())).executionHint(globalOrdinalModes[randomInt(globalOrdinalModes.length - 1)].toString()).size(maxNumTerms).subAggregation(extendedStats("stats").field("num")))
|
||||
.execute().actionGet();
|
||||
@ -229,22 +228,27 @@ public class RandomTests extends ElasticsearchIntegrationTest {
|
||||
final Terms longTerms = resp.getAggregations().get("long");
|
||||
final Terms doubleTerms = resp.getAggregations().get("double");
|
||||
final Terms stringMapTerms = resp.getAggregations().get("string_map");
|
||||
final Terms stringOrdinalsTerms = resp.getAggregations().get("string_ordinals");
|
||||
final Terms stringGlobalOrdinalsTerms = resp.getAggregations().get("string_global_ordinals");
|
||||
final Terms stringGlobalOrdinalsDVTerms = resp.getAggregations().get("string_global_ordinals_doc_values");
|
||||
|
||||
assertEquals(valuesSet.size(), longTerms.getBuckets().size());
|
||||
assertEquals(valuesSet.size(), doubleTerms.getBuckets().size());
|
||||
assertEquals(valuesSet.size(), stringMapTerms.getBuckets().size());
|
||||
assertEquals(valuesSet.size(), stringOrdinalsTerms.getBuckets().size());
|
||||
assertEquals(valuesSet.size(), stringGlobalOrdinalsTerms.getBuckets().size());
|
||||
assertEquals(valuesSet.size(), stringGlobalOrdinalsDVTerms.getBuckets().size());
|
||||
for (Terms.Bucket bucket : longTerms.getBuckets()) {
|
||||
final Terms.Bucket doubleBucket = doubleTerms.getBucketByKey(Double.toString(Long.parseLong(bucket.getKeyAsText().string())));
|
||||
final Terms.Bucket stringMapBucket = stringMapTerms.getBucketByKey(bucket.getKeyAsText().string());
|
||||
final Terms.Bucket stringOrdinalsBucket = stringOrdinalsTerms.getBucketByKey(bucket.getKeyAsText().string());
|
||||
final Terms.Bucket stringGlobalOrdinalsBucket = stringGlobalOrdinalsTerms.getBucketByKey(bucket.getKeyAsText().string());
|
||||
final Terms.Bucket stringGlobalOrdinalsDVBucket = stringGlobalOrdinalsDVTerms.getBucketByKey(bucket.getKeyAsText().string());
|
||||
assertNotNull(doubleBucket);
|
||||
assertNotNull(stringMapBucket);
|
||||
assertNotNull(stringOrdinalsBucket);
|
||||
assertNotNull(stringGlobalOrdinalsBucket);
|
||||
assertNotNull(stringGlobalOrdinalsDVBucket);
|
||||
assertEquals(bucket.getDocCount(), doubleBucket.getDocCount());
|
||||
assertEquals(bucket.getDocCount(), stringMapBucket.getDocCount());
|
||||
assertEquals(bucket.getDocCount(), stringOrdinalsBucket.getDocCount());
|
||||
assertEquals(bucket.getDocCount(), stringGlobalOrdinalsBucket.getDocCount());
|
||||
assertEquals(bucket.getDocCount(), stringGlobalOrdinalsDVBucket.getDocCount());
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user