Save memory when rare_terms is not on top (backport of #57948) (#58069)

This uses the optimization that we started making in #55873 for
`rare_terms` to save a bit of memory when that aggregation is not on the
top level.
This commit is contained in:
Nik Everett 2020-06-12 17:47:10 -04:00 committed by GitHub
parent ddf2ee3f23
commit a5571eb1a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 384 additions and 281 deletions

View File

@ -39,13 +39,13 @@ import java.util.function.Consumer;
* An approximate set membership datastructure that scales as more unique values are inserted.
* Can definitively say if a member does not exist (no false negatives), but may say an item exists
* when it does not (has false positives). Similar in usage to a Bloom Filter.
*
* <p>
* Internally, the datastructure maintains a Set of hashes up to a specified threshold. This provides
* 100% accurate membership queries.
*
* <p>
* When the threshold is breached, a list of CuckooFilters are created and used to track membership.
* These filters are approximate similar to Bloom Filters.
*
* <p>
* This datastructure scales as more values are inserted by growing the list of CuckooFilters.
* Final size is dependent on the cardinality of data inserted, and the precision specified.
*/

View File

@ -236,23 +236,6 @@ public abstract class BucketsAggregator extends AggregatorBase {
}
}
/**
* Build the sub aggregation results for a list of buckets and set them on
* the buckets. This is usually used by aggregations that are selective
* in which bucket they build. They use some mechanism of selecting a list
* of buckets to build use this method to "finish" building the results.
* @param buckets the buckets to finish building
* @param bucketToOrd how to convert a bucket into an ordinal
* @param setAggs how to set the sub-aggregation results on a bucket
*/
protected final <B> void buildSubAggsForBuckets(List<B> buckets,
ToLongFunction<B> bucketToOrd, BiConsumer<B, InternalAggregations> setAggs) throws IOException {
InternalAggregations[] results = buildSubAggsForBuckets(buckets.stream().mapToLong(bucketToOrd).toArray());
for (int i = 0; i < buckets.size(); i++) {
setAggs.accept(buckets.get(i), results[i]);
}
}
/**
* Build aggregation results for an aggregator that has a fixed number of buckets per owning ordinal.
* @param <B> the type of the bucket

View File

@ -24,46 +24,47 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator;
import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
import org.elasticsearch.search.aggregations.bucket.MergingBucketsDeferringCollector;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
public abstract class AbstractRareTermsAggregator<T extends ValuesSource,
U extends IncludeExclude.Filter, V> extends DeferableBucketAggregator {
public abstract class AbstractRareTermsAggregator extends DeferableBucketAggregator {
static final BucketOrder ORDER = BucketOrder.compound(BucketOrder.count(true), BucketOrder.key(true)); // sort by count ascending
protected final long maxDocCount;
protected final double precision;
private final double precision;
protected final DocValueFormat format;
protected final T valuesSource;
protected final U includeExclude;
protected final boolean collectsFromSingleBucket;
private final int filterSeed;
MergingBucketsDeferringCollector deferringCollector;
final SetBackedScalingCuckooFilter filter;
protected MergingBucketsDeferringCollector deferringCollector;
AbstractRareTermsAggregator(String name, AggregatorFactories factories, SearchContext context,
Aggregator parent, Map<String, Object> metadata, long maxDocCount, double precision,
DocValueFormat format, T valuesSource, U includeExclude) throws IOException {
AbstractRareTermsAggregator(
String name,
AggregatorFactories factories,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata,
long maxDocCount,
double precision,
DocValueFormat format,
boolean collectsFromSingleBucket
) throws IOException {
super(name, factories, context, parent, metadata);
// We seed the rng with the ShardID so results are deterministic and don't change randomly
this.filter = new SetBackedScalingCuckooFilter(10000, new Random(context.indexShard().shardId().hashCode()), precision);
this.filter.registerBreaker(this::addRequestCircuitBreakerBytes);
this.maxDocCount = maxDocCount;
this.precision = precision;
this.format = format;
this.valuesSource = valuesSource;
this.includeExclude = includeExclude;
this.collectsFromSingleBucket = collectsFromSingleBucket;
// We seed the rng with the ShardID so results are deterministic and don't change randomly
this.filterSeed = context.indexShard().shardId().hashCode();
String scoringAgg = subAggsNeedScore();
String nestedAgg = descendsFromNestedAggregator(parent);
if (scoringAgg != null && nestedAgg != null) {
@ -81,6 +82,12 @@ public abstract class AbstractRareTermsAggregator<T extends ValuesSource,
}
}
protected SetBackedScalingCuckooFilter newFilter() {
SetBackedScalingCuckooFilter filter = new SetBackedScalingCuckooFilter(10000, new Random(filterSeed), precision);
filter.registerBreaker(this::addRequestCircuitBreakerBytes);
return filter;
}
@Override
protected boolean shouldDefer(Aggregator aggregator) {
return true;
@ -110,21 +117,4 @@ public abstract class AbstractRareTermsAggregator<T extends ValuesSource,
}
return null;
}
protected void doCollect(LeafBucketCollector subCollector, V val, int docId) throws IOException {
long bucketOrdinal = addValueToOrds(val);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(subCollector, docId, bucketOrdinal);
} else {
collectBucket(subCollector, docId, bucketOrdinal);
}
}
/**
* Add's the value to the ordinal map. Return the newly allocated id if it wasn't in the ordinal map yet,
* or <code>-1-id</code> if it was already present
*/
abstract long addValueToOrds(V value);
}

View File

@ -20,9 +20,9 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.common.util.SetBackedScalingCuckooFilter;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -34,6 +34,7 @@ import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -42,15 +43,38 @@ import static java.util.Collections.emptyList;
/**
* An aggregator that finds "rare" string values (e.g. terms agg that orders ascending)
*/
public class LongRareTermsAggregator extends AbstractRareTermsAggregator<ValuesSource.Numeric, IncludeExclude.LongFilter, Long> {
public class LongRareTermsAggregator extends AbstractRareTermsAggregator {
private final ValuesSource.Numeric valuesSource;
private final IncludeExclude.LongFilter filter;
private final LongKeyedBucketOrds bucketOrds;
protected LongHash bucketOrds;
LongRareTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format,
SearchContext aggregationContext, Aggregator parent, IncludeExclude.LongFilter longFilter,
int maxDocCount, double precision, Map<String, Object> metadata) throws IOException {
super(name, factories, aggregationContext, parent, metadata, maxDocCount, precision, format, valuesSource, longFilter);
this.bucketOrds = new LongHash(1, aggregationContext.bigArrays());
LongRareTermsAggregator(
String name,
AggregatorFactories factories,
ValuesSource.Numeric valuesSource,
DocValueFormat format,
SearchContext aggregationContext,
Aggregator parent,
IncludeExclude.LongFilter filter,
int maxDocCount,
double precision,
boolean collectsFromSingleBucket,
Map<String, Object> metadata
) throws IOException {
super(
name,
factories,
aggregationContext,
parent,
metadata,
maxDocCount,
precision,
format,
collectsFromSingleBucket
);
this.valuesSource = valuesSource;
this.filter = filter;
this.bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}
protected SortedNumericDocValues getValues(ValuesSource.Numeric valuesSource, LeafReaderContext ctx) throws IOException {
@ -58,24 +82,31 @@ public class LongRareTermsAggregator extends AbstractRareTermsAggregator<ValuesS
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
final SortedNumericDocValues values = getValues(valuesSource, ctx);
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
SortedNumericDocValues values = getValues(valuesSource, ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int docId, long owningBucketOrdinal) throws IOException {
if (values.advanceExact(docId)) {
final int valuesCount = values.docValueCount();
long previous = Long.MAX_VALUE;
for (int i = 0; i < valuesCount; ++i) {
final long val = values.nextValue();
if (previous != val || i == 0) {
if ((includeExclude == null) || (includeExclude.accept(val))) {
doCollect(sub, val, docId);
}
previous = val;
}
public void collect(int docId, long owningBucketOrd) throws IOException {
if (false == values.advanceExact(docId)) {
return;
}
int valuesCount = values.docValueCount();
long previous = Long.MAX_VALUE;
for (int i = 0; i < valuesCount; ++i) {
long val = values.nextValue();
if (i == 0 && previous == val) {
continue;
}
previous = val;
if (filter != null && false == filter.accept(val)) {
continue;
}
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, docId, bucketOrdinal);
} else {
collectBucket(sub, docId, bucketOrdinal);
}
}
}
@ -83,70 +114,74 @@ public class LongRareTermsAggregator extends AbstractRareTermsAggregator<ValuesS
}
@Override
long addValueToOrds(Long value) {
return bucketOrds.add(value);
}
/**
* Merges the ordinals to a minimal set, populates the CuckooFilter and
* generates a final set of buckets.
*
* If a term is below the maxDocCount, it is turned into a Bucket. Otherwise,
* the term is added to the filter, and pruned from the ordinal map. If
* necessary the ordinal map is merged down to a minimal set to remove deletions
*/
private List<LongRareTerms.Bucket> buildSketch() {
long deletionCount = 0;
LongHash newBucketOrds = new LongHash(1, context.bigArrays());
List<LongRareTerms.Bucket> buckets = new ArrayList<>();
try (LongHash oldBucketOrds = bucketOrds) {
long[] mergeMap = new long[(int) oldBucketOrds.size()];
for (int i = 0; i < oldBucketOrds.size(); i++) {
long oldKey = oldBucketOrds.get(i);
long newBucketOrd = -1;
long docCount = bucketDocCount(i);
// if the key is below threshold, reinsert into the new ords
if (docCount <= maxDocCount) {
newBucketOrd = newBucketOrds.add(oldKey);
LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(oldKey, docCount, null, format);
bucket.bucketOrd = newBucketOrd;
buckets.add(bucket);
} else {
// Make a note when one of the ords has been deleted
deletionCount += 1;
filter.add(oldKey);
}
mergeMap[i] = newBucketOrd;
}
// Only merge/delete the ordinals if we have actually deleted one,
// to save on some redundant work
if (deletionCount > 0) {
mergeBuckets(mergeMap, newBucketOrds.size());
if (deferringCollector != null) {
deferringCollector.mergeBuckets(mergeMap);
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
/*
* Collect the list of buckets, populate the filter with terms
* that are too frequent, and figure out how to merge sub-buckets.
*/
LongRareTerms.Bucket[][] rarestPerOrd = new LongRareTerms.Bucket[owningBucketOrds.length][];
SetBackedScalingCuckooFilter[] filters = new SetBackedScalingCuckooFilter[owningBucketOrds.length];
long keepCount = 0;
long[] mergeMap = new long[(int) bucketOrds.size()];
Arrays.fill(mergeMap, -1);
long offset = 0;
for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) {
try (LongHash bucketsInThisOwningBucketToCollect = new LongHash(1, context.bigArrays())) {
filters[owningOrdIdx] = newFilter();
List<LongRareTerms.Bucket> builtBuckets = new ArrayList<>();
LongKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds[owningOrdIdx]);
while (collectedBuckets.next()) {
long docCount = bucketDocCount(collectedBuckets.ord());
// if the key is below threshold, reinsert into the new ords
if (docCount <= maxDocCount) {
LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(collectedBuckets.value(), docCount, null, format);
bucket.bucketOrd = offset + bucketsInThisOwningBucketToCollect.add(collectedBuckets.value());
mergeMap[(int) collectedBuckets.ord()] = bucket.bucketOrd;
builtBuckets.add(bucket);
keepCount++;
} else {
filters[owningOrdIdx].add(collectedBuckets.value());
}
}
rarestPerOrd[owningOrdIdx] = builtBuckets.toArray(new LongRareTerms.Bucket[0]);
offset += bucketsInThisOwningBucketToCollect.size();
}
}
bucketOrds = newBucketOrds;
return buckets;
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
List<LongRareTerms.Bucket> buckets = buildSketch();
buildSubAggsForBuckets(buckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
/*
* Only merge/delete the ordinals if we have actually deleted one,
* to save on some redundant work.
*/
if (keepCount != mergeMap.length) {
mergeBuckets(mergeMap, offset);
if (deferringCollector != null) {
deferringCollector.mergeBuckets(mergeMap);
}
}
CollectionUtil.introSort(buckets, ORDER.comparator());
return new InternalAggregation[] {new LongRareTerms(name, ORDER, metadata(), format, buckets, maxDocCount, filter)};
/*
* Now build the results!
*/
buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Arrays.sort(rarestPerOrd[ordIdx], ORDER.comparator());
result[ordIdx] = new LongRareTerms(
name,
ORDER,
metadata(),
format,
Arrays.asList(rarestPerOrd[ordIdx]),
maxDocCount,
filters[ordIdx]
);
}
return result;
}
@Override
public InternalAggregation buildEmptyAggregation() {
return new LongRareTerms(name, ORDER, metadata(), format, emptyList(), 0, filter);
return new LongRareTerms(name, ORDER, metadata(), format, emptyList(), 0, newFilter());
}
@Override

View File

@ -72,6 +72,7 @@ public class RareTermsAggregatorFactory extends ValuesSourceAggregatorFactory {
IncludeExclude includeExclude,
SearchContext context,
Aggregator parent,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
ExecutionMode execution = ExecutionMode.MAP; //TODO global ords not implemented yet, only supports "map"
@ -82,8 +83,19 @@ public class RareTermsAggregatorFactory extends ValuesSourceAggregatorFactory {
"Use an array of values for include/exclude clauses");
}
return execution.create(name, factories, valuesSource, format,
includeExclude, context, parent, metadata, maxDocCount, precision);
return execution.create(
name,
factories,
valuesSource,
format,
includeExclude,
context,
parent,
metadata,
maxDocCount,
precision,
collectsFromSingleBucket
);
}
};
@ -105,6 +117,7 @@ public class RareTermsAggregatorFactory extends ValuesSourceAggregatorFactory {
IncludeExclude includeExclude,
SearchContext context,
Aggregator parent,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
if ((includeExclude != null) && (includeExclude.isRegexBased())) {
@ -121,7 +134,7 @@ public class RareTermsAggregatorFactory extends ValuesSourceAggregatorFactory {
longFilter = includeExclude.convertToLongFilter(format);
}
return new LongRareTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, format,
context, parent, longFilter, maxDocCount, precision, metadata);
context, parent, longFilter, maxDocCount, precision, collectsFromSingleBucket, metadata);
}
};
}
@ -155,10 +168,6 @@ public class RareTermsAggregatorFactory extends ValuesSourceAggregatorFactory {
Aggregator parent,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config,
RareTermsAggregationBuilder.NAME);
if (aggregatorSupplier instanceof RareTermsAggregatorSupplier == false) {
@ -167,7 +176,7 @@ public class RareTermsAggregatorFactory extends ValuesSourceAggregatorFactory {
}
return ((RareTermsAggregatorSupplier) aggregatorSupplier).build(name, factories, config.getValuesSource(), config.format(),
maxDocCount, precision, includeExclude, searchContext, parent, metadata);
maxDocCount, precision, includeExclude, searchContext, parent, collectsFromSingleBucket, metadata);
}
public enum ExecutionMode {
@ -178,11 +187,11 @@ public class RareTermsAggregatorFactory extends ValuesSourceAggregatorFactory {
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource,
DocValueFormat format, IncludeExclude includeExclude,
SearchContext context, Aggregator parent,
Map<String, Object> metadata, long maxDocCount, double precision)
Map<String, Object> metadata, long maxDocCount, double precision, boolean collectsFromSingleBucket)
throws IOException {
final IncludeExclude.StringFilter filter = includeExclude == null ? null : includeExclude.convertToStringFilter(format);
return new StringRareTermsAggregator(name, factories, (ValuesSource.Bytes) valuesSource, format, filter,
context, parent, metadata, maxDocCount, precision);
context, parent, metadata, maxDocCount, precision, collectsFromSingleBucket);
}
@Override
@ -210,7 +219,7 @@ public class RareTermsAggregatorFactory extends ValuesSourceAggregatorFactory {
abstract Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource,
DocValueFormat format, IncludeExclude includeExclude,
SearchContext context, Aggregator parent, Map<String, Object> metadata,
long maxDocCount, double precision)
long maxDocCount, double precision, boolean collectsFromSingleBucket)
throws IOException;
abstract boolean needsGlobalOrdinals();

View File

@ -38,5 +38,6 @@ interface RareTermsAggregatorSupplier extends AggregatorSupplier {
IncludeExclude includeExclude,
SearchContext context,
Aggregator parent,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException;
}

View File

@ -21,9 +21,9 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.SetBackedScalingCuckooFilter;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
@ -36,6 +36,7 @@ import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -44,15 +45,38 @@ import static java.util.Collections.emptyList;
/**
* An aggregator that finds "rare" string values (e.g. terms agg that orders ascending)
*/
public class StringRareTermsAggregator extends AbstractRareTermsAggregator<ValuesSource.Bytes, IncludeExclude.StringFilter, BytesRef> {
protected BytesRefHash bucketOrds;
public class StringRareTermsAggregator extends AbstractRareTermsAggregator {
private final ValuesSource.Bytes valuesSource;
private final IncludeExclude.StringFilter filter;
private final BytesKeyedBucketOrds bucketOrds;
StringRareTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Bytes valuesSource,
DocValueFormat format, IncludeExclude.StringFilter stringFilter,
SearchContext context, Aggregator parent,
Map<String, Object> metadata, long maxDocCount, double precision) throws IOException {
super(name, factories, context, parent, metadata, maxDocCount, precision, format, valuesSource, stringFilter);
this.bucketOrds = new BytesRefHash(1, context.bigArrays());
StringRareTermsAggregator(
String name,
AggregatorFactories factories,
ValuesSource.Bytes valuesSource,
DocValueFormat format,
IncludeExclude.StringFilter filter,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata,
long maxDocCount,
double precision,
boolean collectsFromSingleBucket
) throws IOException {
super(
name,
factories,
context,
parent,
metadata,
maxDocCount,
precision,
format,
collectsFromSingleBucket
);
this.valuesSource = valuesSource;
this.filter = filter;
this.bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}
@Override
@ -63,25 +87,30 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator<Value
final BytesRefBuilder previous = new BytesRefBuilder();
@Override
public void collect(int docId, long bucket) throws IOException {
assert bucket == 0;
if (values.advanceExact(docId)) {
final int valuesCount = values.docValueCount();
previous.clear();
public void collect(int docId, long owningBucketOrd) throws IOException {
if (false == values.advanceExact(docId)) {
return;
}
int valuesCount = values.docValueCount();
previous.clear();
// SortedBinaryDocValues don't guarantee uniqueness so we
// need to take care of dups
for (int i = 0; i < valuesCount; ++i) {
final BytesRef bytes = values.nextValue();
if (includeExclude != null && !includeExclude.accept(bytes)) {
continue;
}
if (i > 0 && previous.get().equals(bytes)) {
continue;
}
doCollect(sub, bytes, docId);
previous.copyBytes(bytes);
// SortedBinaryDocValues don't guarantee uniqueness so we
// need to take care of dups
for (int i = 0; i < valuesCount; ++i) {
BytesRef bytes = values.nextValue();
if (filter != null && false == filter.accept(bytes)) {
continue;
}
if (i > 0 && previous.get().equals(bytes)) {
continue;
}
previous.copyBytes(bytes);
long bucketOrdinal = bucketOrds.add(owningBucketOrd, bytes);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, docId, bucketOrdinal);
} else {
collectBucket(sub, docId, bucketOrdinal);
}
}
}
@ -89,70 +118,76 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator<Value
}
@Override
long addValueToOrds(BytesRef value) {
return bucketOrds.add(value);
}
/**
* Merges the ordinals to a minimal set, populates the CuckooFilter and
* generates a final set of buckets.
*
* If a term is below the maxDocCount, it is turned into a Bucket. Otherwise,
* the term is added to the filter, and pruned from the ordinal map. If
* necessary the ordinal map is merged down to a minimal set to remove deletions
*/
private List<StringRareTerms.Bucket> buildSketch() {
long deletionCount = 0;
BytesRefHash newBucketOrds = new BytesRefHash(1, context.bigArrays());
List<StringRareTerms.Bucket> buckets = new ArrayList<>();
try (BytesRefHash oldBucketOrds = bucketOrds) {
long[] mergeMap = new long[(int) oldBucketOrds.size()];
BytesRef scratch = new BytesRef();
for (int i = 0; i < oldBucketOrds.size(); i++) {
BytesRef oldKey = oldBucketOrds.get(i, scratch);
long newBucketOrd = -1;
long docCount = bucketDocCount(i);
// if the key is below threshold, reinsert into the new ords
if (docCount <= maxDocCount) {
newBucketOrd = newBucketOrds.add(oldKey);
StringRareTerms.Bucket bucket = new StringRareTerms.Bucket(BytesRef.deepCopyOf(oldKey), docCount, null, format);
bucket.bucketOrd = newBucketOrd;
buckets.add(bucket);
} else {
// Make a note when one of the ords has been deleted
deletionCount += 1;
filter.add(oldKey);
}
mergeMap[i] = newBucketOrd;
}
// Only merge/delete the ordinals if we have actually deleted one,
// to save on some redundant work
if (deletionCount > 0) {
mergeBuckets(mergeMap, newBucketOrds.size());
if (deferringCollector != null) {
deferringCollector.mergeBuckets(mergeMap);
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
/*
* Collect the list of buckets, populate the filter with terms
* that are too frequent, and figure out how to merge sub-buckets.
*/
StringRareTerms.Bucket[][] rarestPerOrd = new StringRareTerms.Bucket[owningBucketOrds.length][];
SetBackedScalingCuckooFilter[] filters = new SetBackedScalingCuckooFilter[owningBucketOrds.length];
long keepCount = 0;
long[] mergeMap = new long[(int) bucketOrds.size()];
Arrays.fill(mergeMap, -1);
long offset = 0;
for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) {
try (BytesRefHash bucketsInThisOwningBucketToCollect = new BytesRefHash(1, context.bigArrays())) {
filters[owningOrdIdx] = newFilter();
List<StringRareTerms.Bucket> builtBuckets = new ArrayList<>();
BytesKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds[owningOrdIdx]);
BytesRef scratch = new BytesRef();
while (collectedBuckets.next()) {
collectedBuckets.readValue(scratch);
long docCount = bucketDocCount(collectedBuckets.ord());
// if the key is below threshold, reinsert into the new ords
if (docCount <= maxDocCount) {
StringRareTerms.Bucket bucket = new StringRareTerms.Bucket(BytesRef.deepCopyOf(scratch), docCount, null, format);
bucket.bucketOrd = offset + bucketsInThisOwningBucketToCollect.add(scratch);
mergeMap[(int) collectedBuckets.ord()] = bucket.bucketOrd;
builtBuckets.add(bucket);
keepCount++;
} else {
filters[owningOrdIdx].add(scratch);
}
}
rarestPerOrd[owningOrdIdx] = builtBuckets.toArray(new StringRareTerms.Bucket[0]);
offset += bucketsInThisOwningBucketToCollect.size();
}
}
bucketOrds = newBucketOrds;
return buckets;
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
List<StringRareTerms.Bucket> buckets = buildSketch();
buildSubAggsForBuckets(buckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
/*
* Only merge/delete the ordinals if we have actually deleted one,
* to save on some redundant work.
*/
if (keepCount != mergeMap.length) {
mergeBuckets(mergeMap, offset);
if (deferringCollector != null) {
deferringCollector.mergeBuckets(mergeMap);
}
}
CollectionUtil.introSort(buckets, ORDER.comparator());
return new InternalAggregation[] {new StringRareTerms(name, ORDER, metadata(), format, buckets, maxDocCount, filter)};
/*
* Now build the results!
*/
buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Arrays.sort(rarestPerOrd[ordIdx], ORDER.comparator());
result[ordIdx] = new StringRareTerms(
name,
ORDER,
metadata(),
format,
Arrays.asList(rarestPerOrd[ordIdx]),
maxDocCount,
filters[ordIdx]
);
}
return result;
}
@Override
public InternalAggregation buildEmptyAggregation() {
return new StringRareTerms(name, LongRareTermsAggregator.ORDER, metadata(), format, emptyList(), 0, filter);
return new StringRareTerms(name, LongRareTermsAggregator.ORDER, metadata(), format, emptyList(), 0, newFilter());
}
@Override

View File

@ -55,9 +55,11 @@ import org.elasticsearch.index.mapper.TypeFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
@ -69,7 +71,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.ScoreSortBuilder;
@ -79,6 +80,7 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.index.mapper.SeqNoFieldMapper.PRIMARY_TERM_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ -93,9 +95,9 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
private static final List<Long> dataset;
static {
List<Long> d = new ArrayList<>(45);
for (int i = 0; i < 10; i++) {
for (long i = 0; i < 10; i++) {
for (int j = 0; j < i; j++) {
d.add((long) i);
d.add(i);
}
}
dataset = d;
@ -114,11 +116,11 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
public void testMatchNoDocs() throws IOException {
testBothCases(new MatchNoDocsQuery(), dataset,
aggregation -> aggregation.field(KEYWORD_FIELD).maxDocCount(1),
agg -> assertEquals(0, agg.getBuckets().size()), ValueType.STRING
agg -> assertEquals(0, agg.getBuckets().size())
);
testBothCases(new MatchNoDocsQuery(), dataset,
aggregation -> aggregation.field(LONG_FIELD).maxDocCount(1),
agg -> assertEquals(0, agg.getBuckets().size()), ValueType.NUMERIC
agg -> assertEquals(0, agg.getBuckets().size())
);
}
@ -132,7 +134,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
LongRareTerms.Bucket bucket = (LongRareTerms.Bucket) agg.getBuckets().get(0);
assertThat(bucket.getKey(), equalTo(1L));
assertThat(bucket.getDocCount(), equalTo(1L));
}, ValueType.NUMERIC
}
);
testBothCases(query, dataset,
aggregation -> aggregation.field(KEYWORD_FIELD).maxDocCount(1),
@ -141,7 +143,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
StringRareTerms.Bucket bucket = (StringRareTerms.Bucket) agg.getBuckets().get(0);
assertThat(bucket.getKeyAsString(), equalTo("1"));
assertThat(bucket.getDocCount(), equalTo(1L));
}, ValueType.STRING
}
);
}
@ -164,7 +166,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
LongRareTerms.Bucket bucket = (LongRareTerms.Bucket) agg.getBuckets().get(0);
assertThat(bucket.getKey(), equalTo(0L));
assertThat(bucket.getDocCount(), equalTo(1L));
}, ValueType.NUMERIC
}
);
testSearchAndReduceCase(query, d,
aggregation -> aggregation.field(KEYWORD_FIELD).maxDocCount(1),
@ -173,7 +175,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
StringRareTerms.Bucket bucket = (StringRareTerms.Bucket) agg.getBuckets().get(0);
assertThat(bucket.getKeyAsString(), equalTo("0"));
assertThat(bucket.getDocCount(), equalTo(1L));
}, ValueType.STRING
}
);
}
@ -189,7 +191,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
LongRareTerms.Bucket bucket = (LongRareTerms.Bucket) agg.getBuckets().get(0);
assertThat(bucket.getKey(), equalTo(2L));
assertThat(bucket.getDocCount(), equalTo(2L));
}, ValueType.NUMERIC
}
);
testBothCases(query, dataset,
aggregation -> aggregation.field(KEYWORD_FIELD)
@ -200,7 +202,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
StringRareTerms.Bucket bucket = (StringRareTerms.Bucket) agg.getBuckets().get(0);
assertThat(bucket.getKeyAsString(), equalTo("2"));
assertThat(bucket.getDocCount(), equalTo(2L));
}, ValueType.STRING
}
);
}
@ -221,7 +223,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
assertThat(children.asList().size(), equalTo(1));
assertThat(children.asList().get(0).getName(), equalTo("the_max"));
assertThat(((Max)(children.asList().get(0))).getValue(), equalTo(1.0));
}, ValueType.NUMERIC
}
);
testBothCases(query, dataset, aggregation -> {
MaxAggregationBuilder max = new MaxAggregationBuilder("the_max").field(LONG_FIELD);
@ -237,7 +239,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
assertThat(children.asList().size(), equalTo(1));
assertThat(children.asList().get(0).getName(), equalTo("the_max"));
assertThat(((Max)(children.asList().get(0))).getValue(), equalTo(1.0));
}, ValueType.STRING
}
);
}
@ -246,20 +248,20 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
testSearchCase(query, Collections.emptyList(),
aggregation -> aggregation.field(LONG_FIELD).maxDocCount(1),
agg -> assertEquals(0, agg.getBuckets().size()), ValueType.NUMERIC
agg -> assertEquals(0, agg.getBuckets().size())
);
testSearchCase(query, Collections.emptyList(),
aggregation -> aggregation.field(KEYWORD_FIELD).maxDocCount(1),
agg -> assertEquals(0, agg.getBuckets().size()), ValueType.STRING
agg -> assertEquals(0, agg.getBuckets().size())
);
testSearchAndReduceCase(query, Collections.emptyList(),
aggregation -> aggregation.field(LONG_FIELD).maxDocCount(1),
agg -> assertEquals(0, agg.getBuckets().size()), ValueType.NUMERIC
agg -> assertEquals(0, agg.getBuckets().size())
);
testSearchAndReduceCase(query, Collections.emptyList(),
aggregation -> aggregation.field(KEYWORD_FIELD).maxDocCount(1),
agg -> assertEquals(0, agg.getBuckets().size()), ValueType.STRING
agg -> assertEquals(0, agg.getBuckets().size())
);
}
@ -281,11 +283,9 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
try (IndexReader indexReader = maybeWrapReaderEs(indexWriter.getReader())) {
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
ValueType[] valueTypes = new ValueType[]{ValueType.STRING, ValueType.LONG};
String[] fieldNames = new String[]{"string", "long"};
for (int i = 0; i < fieldNames.length; i++) {
RareTermsAggregationBuilder aggregationBuilder = new RareTermsAggregationBuilder("_name")
.userValueTypeHint(valueTypes[i])
.field(fieldNames[i]);
Aggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType1, fieldType2);
aggregator.preCollection();
@ -334,7 +334,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
Query query = new MatchAllDocsQuery();
testBothCases(query, dataset, aggregation -> {
TermsAggregationBuilder terms = new TermsAggregationBuilder("the_terms").userValueTypeHint(ValueType.STRING)
TermsAggregationBuilder terms = new TermsAggregationBuilder("the_terms")
.field(KEYWORD_FIELD);
aggregation.field(LONG_FIELD).maxDocCount(1).subAggregation(terms);
},
@ -349,11 +349,11 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
assertThat(children.asList().get(0).getName(), equalTo("the_terms"));
assertThat(((Terms)(children.asList().get(0))).getBuckets().size(), equalTo(1));
assertThat(((Terms)(children.asList().get(0))).getBuckets().get(0).getKeyAsString(), equalTo("1"));
}, ValueType.NUMERIC
}
);
testBothCases(query, dataset, aggregation -> {
TermsAggregationBuilder terms = new TermsAggregationBuilder("the_terms").userValueTypeHint(ValueType.STRING)
TermsAggregationBuilder terms = new TermsAggregationBuilder("the_terms")
.field(KEYWORD_FIELD);
aggregation.field(KEYWORD_FIELD).maxDocCount(1).subAggregation(terms);
},
@ -368,10 +368,42 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
assertThat(children.asList().get(0).getName(), equalTo("the_terms"));
assertThat(((Terms)(children.asList().get(0))).getBuckets().size(), equalTo(1));
assertThat(((Terms)(children.asList().get(0))).getBuckets().get(0).getKeyAsString(), equalTo("1"));
}, ValueType.STRING
}
);
}
public void testInsideTerms() throws IOException {
for (boolean reduce : new boolean[] {false, true}) {
for (String field : new String[] {KEYWORD_FIELD, LONG_FIELD}) {
AggregationBuilder builder = new TermsAggregationBuilder("terms").field("even_odd").subAggregation(
new RareTermsAggregationBuilder("rare").field(field).maxDocCount(2));
StringTerms terms = (StringTerms) executeTestCase(reduce, new MatchAllDocsQuery(), dataset, builder);
StringTerms.Bucket even = terms.getBucketByKey("even");
InternalRareTerms<?, ?> evenRare = even.getAggregations().get("rare");
assertEquals(
evenRare.getBuckets().stream().map(InternalRareTerms.Bucket::getKeyAsString).collect(toList()),
org.elasticsearch.common.collect.List.of("2")
);
assertEquals(
evenRare.getBuckets().stream().map(InternalRareTerms.Bucket::getDocCount).collect(toList()),
org.elasticsearch.common.collect.List.of(2L)
);
StringTerms.Bucket odd = terms.getBucketByKey("odd");
InternalRareTerms<?, ?> oddRare = odd.getAggregations().get("rare");
assertEquals(
oddRare.getBuckets().stream().map(InternalRareTerms.Bucket::getKeyAsString).collect(toList()),
org.elasticsearch.common.collect.List.of("1")
);
assertEquals(
oddRare.getBuckets().stream().map(InternalRareTerms.Bucket::getDocCount).collect(toList()),
org.elasticsearch.common.collect.List.of(1L)
);
}
}
}
public void testGlobalAggregationWithScore() throws IOException {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
@ -386,15 +418,12 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
indexWriter.addDocument(document);
try (IndexReader indexReader = maybeWrapReaderEs(indexWriter.getReader())) {
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
Aggregator.SubAggCollectionMode collectionMode = randomFrom(Aggregator.SubAggCollectionMode.values());
GlobalAggregationBuilder globalBuilder = new GlobalAggregationBuilder("global")
.subAggregation(
new RareTermsAggregationBuilder("terms")
.userValueTypeHint(ValueType.STRING)
.field("keyword")
.subAggregation(
new RareTermsAggregationBuilder("sub_terms")
.userValueTypeHint(ValueType.STRING)
.field("keyword")
.subAggregation(
new TopHitsAggregationBuilder("top_hits")
@ -439,7 +468,6 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
NestedAggregationBuilder nested = new NestedAggregationBuilder("nested", "nested_object")
.subAggregation(new RareTermsAggregationBuilder("terms")
.userValueTypeHint(ValueType.LONG)
.field("nested_value")
.maxDocCount(1)
);
@ -473,7 +501,6 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
for (boolean withScore : new boolean[]{true, false}) {
NestedAggregationBuilder nested = new NestedAggregationBuilder("nested", "nested_object")
.subAggregation(new RareTermsAggregationBuilder("terms")
.userValueTypeHint(ValueType.LONG)
.field("nested_value")
.maxDocCount(2)
.subAggregation(
@ -540,23 +567,32 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
return documents;
}
private void testSearchCase(Query query, List<Long> dataset,
Consumer<RareTermsAggregationBuilder> configure,
Consumer<InternalMappedRareTerms> verify, ValueType valueType) throws IOException {
executeTestCase(false, query, dataset, configure, verify, valueType);
private void testSearchCase(
Query query,
List<Long> dataset,
Consumer<RareTermsAggregationBuilder> configure,
Consumer<InternalMappedRareTerms<?, ?>> verify
) throws IOException {
executeTestCase(false, query, dataset, configure, verify);
}
private void testSearchAndReduceCase(Query query, List<Long> dataset,
Consumer<RareTermsAggregationBuilder> configure,
Consumer<InternalMappedRareTerms> verify, ValueType valueType) throws IOException {
executeTestCase(true, query, dataset, configure, verify, valueType);
private void testSearchAndReduceCase(
Query query,
List<Long> dataset,
Consumer<RareTermsAggregationBuilder> configure,
Consumer<InternalMappedRareTerms<?, ?>> verify
) throws IOException {
executeTestCase(true, query, dataset, configure, verify);
}
private void testBothCases(Query query, List<Long> dataset,
Consumer<RareTermsAggregationBuilder> configure,
Consumer<InternalMappedRareTerms> verify, ValueType valueType) throws IOException {
testSearchCase(query, dataset, configure, verify, valueType);
testSearchAndReduceCase(query, dataset, configure, verify, valueType);
private void testBothCases(
Query query,
List<Long> dataset,
Consumer<RareTermsAggregationBuilder> configure,
Consumer<InternalMappedRareTerms<?, ?>> verify
) throws IOException {
testSearchCase(query, dataset, configure, verify);
testSearchAndReduceCase(query, dataset, configure, verify);
}
@Override
@ -575,8 +611,26 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
private void executeTestCase(boolean reduced, Query query, List<Long> dataset,
Consumer<RareTermsAggregationBuilder> configure,
Consumer<InternalMappedRareTerms> verify, ValueType valueType) throws IOException {
Consumer<InternalMappedRareTerms<?, ?>> verify) throws IOException {
RareTermsAggregationBuilder aggregationBuilder = new RareTermsAggregationBuilder("_name");
if (configure != null) {
configure.accept(aggregationBuilder);
}
InternalMappedRareTerms<?, ?> result = (InternalMappedRareTerms<?, ?>) executeTestCase(
reduced,
query,
dataset,
aggregationBuilder
);
verify.accept(result);
}
private InternalAggregation executeTestCase(
boolean reduced,
Query query,
List<Long> dataset,
AggregationBuilder aggregationBuilder
) throws IOException {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
@ -590,6 +644,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
document.add(new SortedNumericDocValuesField(LONG_FIELD, value));
document.add(new LongPoint(LONG_FIELD, value));
document.add(new SortedSetDocValuesField(KEYWORD_FIELD, new BytesRef(Long.toString(value))));
document.add(new SortedSetDocValuesField("even_odd", new BytesRef(value % 2 == 0 ? "even" : "odd")));
indexWriter.addDocument(document);
document.clear();
}
@ -598,12 +653,6 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
RareTermsAggregationBuilder aggregationBuilder = new RareTermsAggregationBuilder("_name");
aggregationBuilder.userValueTypeHint(valueType);
if (configure != null) {
configure.accept(aggregationBuilder);
}
MappedFieldType keywordFieldType = new KeywordFieldMapper.KeywordFieldType();
keywordFieldType.setName(KEYWORD_FIELD);
keywordFieldType.setHasDocValues(true);
@ -612,13 +661,14 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
longFieldType.setName(LONG_FIELD);
longFieldType.setHasDocValues(true);
InternalMappedRareTerms rareTerms;
MappedFieldType[] types = new MappedFieldType[] {
keywordField(KEYWORD_FIELD),
longField(LONG_FIELD),
keywordField("even_odd")};
if (reduced) {
rareTerms = searchAndReduce(indexSearcher, query, aggregationBuilder, keywordFieldType, longFieldType);
} else {
rareTerms = search(indexSearcher, query, aggregationBuilder, keywordFieldType, longFieldType);
return searchAndReduce(indexSearcher, query, aggregationBuilder, types);
}
verify.accept(rareTerms);
return search(indexSearcher, query, aggregationBuilder, types);
}
}
}