Save memory when numeric terms agg is not top (#55873) (#56454)

Right now all implementations of the `terms` agg allocate a new
`Aggregator` per bucket. This uses a bunch of memory. Exactly how much
isn't clear but each `Aggregator` ends up making its own objects to read
doc values which have non-trivial buffers. And it forces all of it
sub-aggregations to do the same. We allocate a new `Aggregator` per
bucket for two reasons:

1. We didn't have an appropriate data structure to track the
   sub-ordinals of each parent bucket.
2. You can only make a single call to `runDeferredCollections(long...)`
   per `Aggregator` which was the only way to delay collection of
   sub-aggregations.

This change switches the method that builds aggregation results from
building them one at a time to building all of the results for the
entire aggregator at the same time.

It also adds a fairly simplistic data structure to track the sub-ordinals
for `long`-keyed buckets.

It uses both of those to power numeric `terms` aggregations and removes
the per-bucket allocation of their `Aggregator`. This fairly
substantially reduces memory consumption of numeric `terms` aggregations
that are not the "top level", especially when those aggregations contain
many sub-aggregations. It also is a pretty big speed up, especially when
the aggregation is under a non-selective aggregation like
the `date_histogram`.

I picked numeric `terms` aggregations because those have the simplest
implementation. At least, I could kind of fit it in my head. And I
haven't fully understood the "bytes"-based terms aggregations, but I
imagine I'll be able to make similar optimizations to them in follow up
changes.
This commit is contained in:
Nik Everett 2020-05-08 20:38:53 -04:00 committed by GitHub
parent 0fb9bc5379
commit 2f38aeb5e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
64 changed files with 1384 additions and 512 deletions

View File

@ -45,9 +45,9 @@ public class ChildrenToParentAggregator extends ParentJoinAggregator {
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalParent(name, bucketDocCount(owningBucketOrdinal),
bucketAggregations(owningBucketOrdinal), metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
new InternalParent(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata()));
}
@Override

View File

@ -41,9 +41,9 @@ public class ParentToChildrenAggregator extends ParentJoinAggregator {
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalChildren(name, bucketDocCount(owningBucketOrdinal),
bucketAggregations(owningBucketOrdinal), metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
new InternalChildren(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata()));
}
@Override

View File

@ -125,7 +125,7 @@ public class AggregationPhase implements SearchPhase {
for (Aggregator aggregator : context.aggregations().aggregators()) {
try {
aggregator.postCollection();
aggregations.add(aggregator.buildAggregation(0));
aggregations.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
@ -137,5 +137,4 @@ public class AggregationPhase implements SearchPhase {
context.aggregations(null);
context.queryCollectors().remove(AggregationPhase.class);
}
}

View File

@ -152,9 +152,24 @@ public abstract class Aggregator extends BucketCollector implements Releasable {
}
/**
* Build an aggregation for data that has been collected into {@code bucket}.
* Build the results of this aggregation.
* @param owningBucketOrds the ordinals of the buckets that we want to
* collect from this aggregation
* @return the results for each ordinal, in the same order as the array
* of ordinals
*/
public abstract InternalAggregation buildAggregation(long bucket) throws IOException;
public abstract InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException;
/**
* Build the result of this aggregation if it is at the "top level"
* of the aggregation tree. If, instead, it is a sub-aggregation of
* another aggregation then the aggregation that contains it will call
* {@link #buildAggregations(long[])}.
*/
public final InternalAggregation buildTopLevel() throws IOException {
assert parent() == null;
return buildAggregations(new long[] {0})[0];
}
/**
* Build an empty aggregation.

View File

@ -146,14 +146,27 @@ public abstract class AggregatorFactory {
}
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
if (bucket < aggregators.size()) {
Aggregator aggregator = aggregators.get(bucket);
if (aggregator != null) {
return aggregator.buildAggregation(0);
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
if (owningBucketOrds[ordIdx] < aggregators.size()) {
Aggregator aggregator = aggregators.get(owningBucketOrds[ordIdx]);
if (aggregator != null) {
/*
* This is the same call as buildTopLevel but since
* this aggregator may not be the top level we don't
* call that method here. It'd be weird sounding. And
* it'd trip assertions. Both bad.
*/
results[ordIdx] = aggregator.buildAggregations(new long [] {0})[0];
} else {
results[ordIdx] = buildEmptyAggregation();
}
} else {
results[ordIdx] = buildEmptyAggregation();
}
}
return buildEmptyAggregation();
return results;
}
@Override
@ -232,7 +245,9 @@ public abstract class AggregatorFactory {
* Utility method. Given an {@link AggregatorFactory} that creates
* {@link Aggregator}s that only know how to collect bucket {@code 0}, this
* returns an aggregator that can collect any bucket.
* @deprecated implement the aggregator to handle many owning buckets
*/
@Deprecated
protected static Aggregator asMultiBucketAggregator(final AggregatorFactory factory, final SearchContext searchContext,
final Aggregator parent) throws IOException {
final Aggregator first = factory.create(searchContext, parent, true);

View File

@ -363,5 +363,4 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
// subclasses will override this with a real implementation if you can sort on a descendant
throw new IllegalArgumentException("Can't sort by a descendant of a [" + getType() + "] aggregation [" + head + "]");
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import java.io.IOException;
import java.util.stream.Stream;
@ -73,9 +74,33 @@ public abstract class LeafBucketCollector implements LeafCollector {
}
/**
* Collect the given doc in the given bucket.
* Collect the given {@code doc} in the bucket owned by
* {@code owningBucketOrd}.
* <p>
* The implementation of this method metric aggregations is generally
* something along the lines of
* <pre>{@code
* array[owningBucketOrd] += loadValueFromDoc(doc)
* }</pre>
* <p>Bucket aggregations have more trouble because their job is to
* <strong>make</strong> new ordinals. So their implementation generally
* looks kind of like
* <pre>{@code
* long myBucketOrd = mapOwningBucketAndValueToMyOrd(owningBucketOrd, loadValueFromDoc(doc));
* collectBucket(doc, myBucketOrd);
* }</pre>
* <p>
* Some bucket aggregations "know" how many ordinals each owning ordinal
* needs so they can map "densely". The {@code range} aggregation, for
* example, can perform this mapping with something like:
* <pre>{@code
* return rangeCount * owningBucketOrd + matchingRange(value);
* }</pre>
* Other aggregations don't know how many buckets will fall into any
* particular owning bucket. The {@code terms} aggregation, for example,
* uses {@link LongKeyedBucketOrds} which amounts to a hash lookup.
*/
public abstract void collect(int doc, long bucket) throws IOException;
public abstract void collect(int doc, long owningBucketOrd) throws IOException;
@Override
public final void collect(int doc) throws IOException {

View File

@ -34,7 +34,7 @@ import java.util.function.IntConsumer;
/**
* An aggregation service that creates instances of {@link MultiBucketConsumer}.
* The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created
* in {@link Aggregator#buildAggregation} and {@link InternalAggregation#reduce}.
* in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000.
*/
public class MultiBucketConsumerService {
@ -94,7 +94,7 @@ public class MultiBucketConsumerService {
* An {@link IntConsumer} that throws a {@link TooManyBucketsException}
* when the sum of the provided values is above the limit (`search.max_buckets`).
* It is used by aggregators to limit the number of bucket creation during
* {@link Aggregator#buildAggregation} and {@link InternalAggregation#reduce}.
* {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
*/
public static class MultiBucketConsumer implements IntConsumer {
private final int limit;

View File

@ -48,7 +48,11 @@ public abstract class NonCollectingAggregator extends AggregatorBase {
}
@Override
public final InternalAggregation buildAggregation(long owningBucketOrdinal) {
return buildEmptyAggregation();
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildEmptyAggregation();
}
return results;
}
}

View File

@ -154,11 +154,10 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
throw new IllegalStateException("Already been replayed");
}
final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
for (long bucket : selectedBuckets) {
hash.add(bucket);
this.selectedBuckets = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
for (long ord : selectedBuckets) {
this.selectedBuckets.add(ord);
}
this.selectedBuckets = hash;
boolean needsScores = scoreMode().needsScores();
Weight weight = null;
@ -185,7 +184,7 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
final long rebasedBucket = this.selectedBuckets.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (scoreIt.docID() < doc) {
@ -213,19 +212,20 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
public Aggregator wrap(final Aggregator in) {
return new WrappedAggregator(in) {
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
if (selectedBuckets == null) {
throw new IllegalStateException("Collection has not been replayed yet.");
}
final long rebasedBucket = selectedBuckets.find(bucket);
if (rebasedBucket == -1) {
throw new IllegalStateException("Cannot build for a bucket which has not been collected");
long[] rebasedOrds = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
rebasedOrds[ordIdx] = selectedBuckets.find(owningBucketOrds[ordIdx]);
if (rebasedOrds[ordIdx] == -1) {
throw new IllegalStateException("Cannot build for a bucket which has not been collected");
}
}
return in.buildAggregation(rebasedBucket);
return in.buildAggregations(rebasedOrds);
}
};
}

View File

@ -21,9 +21,11 @@ package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorBase;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
@ -32,10 +34,16 @@ import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.ToLongFunction;
public abstract class BucketsAggregator extends AggregatorBase {
@ -136,14 +144,205 @@ public abstract class BucketsAggregator extends AggregatorBase {
}
/**
* Required method to build the child aggregations of the given bucket (identified by the bucket ordinal).
* Hook to allow taking an action before building buckets.
*/
protected final InternalAggregations bucketAggregations(long bucket) throws IOException {
final InternalAggregation[] aggregations = new InternalAggregation[subAggregators.length];
protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {}
/**
* Build the results of the sub-aggregations of the buckets at each of
* the provided ordinals.
* <p>
* Most aggregations should probably use something like
* {@link #buildSubAggsForAllBuckets(Object[][], ToLongFunction, BiConsumer)}
* or {@link #buildAggregationsForVariableBuckets(long[], LongHash, BucketBuilderForVariable, Function)}
* or {@link #buildAggregationsForFixedBucketCount(long[], int, BucketBuilderForFixedCount, Function)}
* or {@link #buildAggregationsForSingleBucket(long[], SingleBucketResultBuilder)}
* instead of calling this directly.
* @return the sub-aggregation results in the same order as the provided
* array of ordinals
*/
protected final InternalAggregations[] buildSubAggsForBuckets(long[] bucketOrdsToCollect) throws IOException {
beforeBuildingBuckets(bucketOrdsToCollect);
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
for (int i = 0; i < subAggregators.length; i++) {
aggregations[i] = subAggregators[i].buildAggregation(bucket);
aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect);
}
return new InternalAggregations(Arrays.asList(aggregations));
InternalAggregations[] result = new InternalAggregations[bucketOrdsToCollect.length];
for (int ord = 0; ord < bucketOrdsToCollect.length; ord++) {
InternalAggregation[] slice = new InternalAggregation[subAggregators.length];
for (int i = 0; i < subAggregators.length; i++) {
slice[i] = aggregations[i][ord];
}
final int thisOrd = ord;
result[ord] = new InternalAggregations(new AbstractList<InternalAggregation>() {
@Override
public InternalAggregation get(int index) {
return aggregations[index][thisOrd];
}
@Override
public int size() {
return aggregations.length;
}
});
}
return result;
}
/**
* Build the sub aggregation results for a list of buckets and set them on
* the buckets. This is usually used by aggregations that are selective
* in which bucket they build. They use some mechanism of selecting a list
* of buckets to build use this method to "finish" building the results.
* @param buckets the buckets to finish building
* @param bucketToOrd how to convert a bucket into an ordinal
* @param setAggs how to set the sub-aggregation results on a bucket
*/
protected final <B> void buildSubAggsForBuckets(B[] buckets,
ToLongFunction<B> bucketToOrd, BiConsumer<B, InternalAggregations> setAggs) throws IOException {
InternalAggregations[] results = buildSubAggsForBuckets(Arrays.stream(buckets).mapToLong(bucketToOrd).toArray());
for (int i = 0; i < buckets.length; i++) {
setAggs.accept(buckets[i], results[i]);
}
}
/**
* Build the sub aggregation results for a list of buckets and set them on
* the buckets. This is usually used by aggregations that are selective
* in which bucket they build. They use some mechanism of selecting a list
* of buckets to build use this method to "finish" building the results.
* @param buckets the buckets to finish building
* @param bucketToOrd how to convert a bucket into an ordinal
* @param setAggs how to set the sub-aggregation results on a bucket
*/
protected final <B> void buildSubAggsForAllBuckets(B[][] buckets,
ToLongFunction<B> bucketToOrd, BiConsumer<B, InternalAggregations> setAggs) throws IOException {
int totalBucketOrdsToCollect = 0;
for (B[] bucketsForOneResult : buckets) {
totalBucketOrdsToCollect += bucketsForOneResult.length;
}
long[] bucketOrdsToCollect = new long[totalBucketOrdsToCollect];
int s = 0;
for (B[] bucketsForOneResult : buckets) {
for (B bucket : bucketsForOneResult) {
bucketOrdsToCollect[s++] = bucketToOrd.applyAsLong(bucket);
}
}
InternalAggregations[] results = buildSubAggsForBuckets(bucketOrdsToCollect);
s = 0;
for (int r = 0; r < buckets.length; r++) {
for (int b = 0; b < buckets[r].length; b++) {
setAggs.accept(buckets[r][b], results[s++]);
}
}
}
/**
* Build the sub aggregation results for a list of buckets and set them on
* the buckets. This is usually used by aggregations that are selective
* in which bucket they build. They use some mechanism of selecting a list
* of buckets to build use this method to "finish" building the results.
* @param buckets the buckets to finish building
* @param bucketToOrd how to convert a bucket into an ordinal
* @param setAggs how to set the sub-aggregation results on a bucket
*/
protected final <B> void buildSubAggsForBuckets(List<B> buckets,
ToLongFunction<B> bucketToOrd, BiConsumer<B, InternalAggregations> setAggs) throws IOException {
InternalAggregations[] results = buildSubAggsForBuckets(buckets.stream().mapToLong(bucketToOrd).toArray());
for (int i = 0; i < buckets.size(); i++) {
setAggs.accept(buckets.get(i), results[i]);
}
}
/**
* Build aggregation results for an aggregator that has a fixed number of buckets per owning ordinal.
* @param <B> the type of the bucket
* @param owningBucketOrds owning bucket ordinals for which to build the results
* @param bucketsPerOwningBucketOrd how many buckets there are per ord
* @param bucketBuilder how to build a bucket
* @param resultBuilder how to build a result from buckets
*/
protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(long[] owningBucketOrds, int bucketsPerOwningBucketOrd,
BucketBuilderForFixedCount<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd;
consumeBucketsAndMaybeBreak(totalBuckets);
long[] bucketOrdsToCollect = new long[totalBuckets];
int bucketOrdIdx = 0;
for (long owningBucketOrd : owningBucketOrds) {
long ord = owningBucketOrd * bucketsPerOwningBucketOrd;
for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
bucketOrdsToCollect[bucketOrdIdx++] = ord++;
}
}
bucketOrdIdx = 0;
InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) {
List<B> buckets = new ArrayList<>(bucketsPerOwningBucketOrd);
for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
buckets.add(bucketBuilder.build(
offsetInOwningOrd, bucketDocCount(bucketOrdsToCollect[bucketOrdIdx]), subAggregationResults[bucketOrdIdx++]));
}
results[owningOrdIdx] = resultBuilder.apply(buckets);
}
return results;
}
@FunctionalInterface
protected interface BucketBuilderForFixedCount<B> {
B build(int offsetInOwningOrd, int docCount, InternalAggregations subAggregationResults);
}
/**
* Build aggregation results for an aggregator that always contain a single bucket.
* @param owningBucketOrds owning bucket ordinals for which to build the results
* @param resultBuilder how to build a result from the sub aggregation results
*/
protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds,
SingleBucketResultBuilder resultBuilder) throws IOException {
/*
* It'd be entirely reasonable to call
* `consumeBucketsAndMaybeBreak(owningBucketOrds.length)`
* here but we don't because single bucket aggs never have.
*/
InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(owningBucketOrds);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], subAggregationResults[ordIdx]);
}
return results;
}
@FunctionalInterface
protected interface SingleBucketResultBuilder {
InternalAggregation build(long owningBucketOrd, InternalAggregations subAggregationResults);
}
/**
* Build aggregation results for an aggregator with a varying number of
* {@code long} keyed buckets that is at the top level or wrapped in
* {@link AggregatorFactory#asMultiBucketAggregator}.
* @param owningBucketOrds owning bucket ordinals for which to build the results
* @param bucketOrds hash of values to the bucket ordinal
*/
protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongHash bucketOrds,
BucketBuilderForVariable<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
consumeBucketsAndMaybeBreak((int) bucketOrds.size());
long[] bucketOrdsToCollect = new long[(int) bucketOrds.size()];
for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) {
bucketOrdsToCollect[bucketOrd] = bucketOrd;
}
InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
List<B> buckets = new ArrayList<>((int) bucketOrds.size());
for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) {
buckets.add(bucketBuilder.build(bucketOrds.get(bucketOrd), bucketDocCount(bucketOrd), subAggregationResults[bucketOrd]));
}
return new InternalAggregation[] { resultBuilder.apply(buckets) };
}
@FunctionalInterface
protected interface BucketBuilderForVariable<B> {
B build(long bucketValue, int docCount, InternalAggregations subAggregationResults);
}
/**
@ -174,6 +373,9 @@ public abstract class BucketsAggregator extends AggregatorBase {
@Override
public BucketComparator bucketComparator(String key, SortOrder order) {
if (false == this instanceof SingleBucketAggregator) {
return super.bucketComparator(key, order);
}
if (key == null || "doc_count".equals(key)) {
return (lhs, rhs) -> order.reverseMul() * Integer.compare(bucketDocCount(lhs), bucketDocCount(rhs));
}

View File

@ -32,7 +32,10 @@ import java.util.List;
import java.util.Map;
public abstract class DeferableBucketAggregator extends BucketsAggregator {
/**
* Wrapper that records collections. Non-null if any aggregations have
* been deferred.
*/
private DeferringBucketCollector recordingWrapper;
protected DeferableBucketAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
@ -81,13 +84,9 @@ public abstract class DeferableBucketAggregator extends BucketsAggregator {
/**
* This method should be overridden by subclasses that want to defer
* calculation of a child aggregation until a first pass is complete and a
* set of buckets has been pruned. Deferring collection will require the
* recording of all doc/bucketIds from the first pass and then the sub class
* should call {@link #runDeferredCollections(long...)} for the selected set
* of buckets that survive the pruning.
* set of buckets has been pruned.
*
* @param aggregator
* the child aggregator
* @param aggregator the child aggregator
* @return true if the aggregator should be deferred until a first pass at
* collection has completed
*/
@ -95,12 +94,10 @@ public abstract class DeferableBucketAggregator extends BucketsAggregator {
return false;
}
protected final void runDeferredCollections(long... bucketOrds) throws IOException {
// Being lenient here - ignore calls where there are no deferred
// collections to playback
@Override
protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {
if (recordingWrapper != null) {
recordingWrapper.replay(bucketOrds);
recordingWrapper.prepareSelectedBuckets(ordsToCollect);
}
}
}

View File

@ -44,10 +44,9 @@ public abstract class DeferringBucketCollector extends BucketCollector {
/** Set the deferred collectors. */
public abstract void setDeferredCollector(Iterable<BucketCollector> deferredCollectors);
public final void replay(long... selectedBuckets) throws IOException {
prepareSelectedBuckets(selectedBuckets);
}
/**
* Replay the deferred hits on the selected buckets.
*/
public abstract void prepareSelectedBuckets(long... selectedBuckets) throws IOException;
/**
@ -96,8 +95,8 @@ public abstract class DeferringBucketCollector extends BucketCollector {
}
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
return in.buildAggregation(bucket);
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return in.buildAggregations(owningBucketOrds);
}
@Override

View File

@ -37,7 +37,7 @@ public class MergingBucketsDeferringCollector extends BestBucketsDeferringCollec
super(context, isGlobal);
}
/**
/**
* Merges/prunes the existing bucket ordinals and docDeltas according to the provided mergeMap.
*
* The mergeMap is an array where the index position represents the current bucket ordinal, and

View File

@ -35,6 +35,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
@ -174,41 +175,60 @@ public class AdjacencyMatrixAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
// Buckets are ordered into groups - [keyed filters] [key1&key2 intersects]
List<InternalAdjacencyMatrix.InternalBucket> buckets = new ArrayList<>(filters.length);
for (int i = 0; i < keys.length; i++) {
long bucketOrd = bucketOrd(owningBucketOrdinal, i);
int docCount = bucketDocCount(bucketOrd);
// Empty buckets are not returned because this aggregation will commonly be used under a
// a date-histogram where we will look for transactions over time and can expect many
// empty buckets.
if (docCount > 0) {
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(keys[i],
docCount, bucketAggregations(bucketOrd));
buckets.add(bucket);
consumeBucketsAndMaybeBreak(1);
int maxOrd = owningBucketOrds.length * totalNumKeys;
int totalBucketsToBuild = 0;
for (int ord = 0; ord < maxOrd; ord++) {
if (bucketDocCount(ord) > 0) {
totalBucketsToBuild++;
}
}
int pos = keys.length;
for (int i = 0; i < keys.length; i++) {
for (int j = i + 1; j < keys.length; j++) {
long bucketOrd = bucketOrd(owningBucketOrdinal, pos);
consumeBucketsAndMaybeBreak(totalBucketsToBuild);
long[] bucketOrdsToBuild = new long[totalBucketsToBuild];
int builtBucketIndex = 0;
for (int ord = 0; ord < maxOrd; ord++) {
if (bucketDocCount(ord) > 0) {
bucketOrdsToBuild[builtBucketIndex++] = ord;
}
}
assert builtBucketIndex == totalBucketsToBuild;
builtBucketIndex = 0;
InternalAggregations[] bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int owningBucketOrdIdx = 0; owningBucketOrdIdx < owningBucketOrds.length; owningBucketOrdIdx++) {
List<InternalAdjacencyMatrix.InternalBucket> buckets = new ArrayList<>(filters.length);
for (int i = 0; i < keys.length; i++) {
long bucketOrd = bucketOrd(owningBucketOrds[owningBucketOrdIdx], i);
int docCount = bucketDocCount(bucketOrd);
// Empty buckets are not returned due to potential for very sparse matrices
// Empty buckets are not returned because this aggregation will commonly be used under a
// a date-histogram where we will look for transactions over time and can expect many
// empty buckets.
if (docCount > 0) {
String intersectKey = keys[i] + separator + keys[j];
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(intersectKey,
docCount, bucketAggregations(bucketOrd));
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(keys[i],
docCount, bucketSubAggs[builtBucketIndex++]);
buckets.add(bucket);
consumeBucketsAndMaybeBreak(1);
}
pos++;
}
int pos = keys.length;
for (int i = 0; i < keys.length; i++) {
for (int j = i + 1; j < keys.length; j++) {
long bucketOrd = bucketOrd(owningBucketOrds[owningBucketOrdIdx], pos);
int docCount = bucketDocCount(bucketOrd);
// Empty buckets are not returned due to potential for very sparse matrices
if (docCount > 0) {
String intersectKey = keys[i] + separator + keys[j];
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(intersectKey,
docCount, bucketSubAggs[builtBucketIndex++]);
buckets.add(bucket);
}
pos++;
}
}
results[owningBucketOrdIdx] = new InternalAdjacencyMatrix(name, buckets, metadata());
}
return new InternalAdjacencyMatrix(name, buckets, metadata());
assert builtBucketIndex == totalBucketsToBuild;
return results;
}
@Override

View File

@ -133,8 +133,9 @@ final class CompositeAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation buildAggregation(long zeroBucket) throws IOException {
assert zeroBucket == 0L;
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
// Composite aggregator must be at the top of the aggregation tree
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
consumeBucketsAndMaybeBreak(queue.size());
if (deferredCollectors != NO_OP_COLLECTOR) {
// Replay all documents that contain at least one top bucket (collected during the first pass).
@ -143,16 +144,23 @@ final class CompositeAggregator extends BucketsAggregator {
int num = Math.min(size, queue.size());
final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];
long[] bucketOrdsToCollect = new long[queue.size()];
for (int i = 0; i < queue.size(); i++) {
bucketOrdsToCollect[i] = i;
}
InternalAggregations[] subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect);
while (queue.size() > 0) {
int slot = queue.pop();
CompositeKey key = queue.toCompositeKey(slot);
InternalAggregations aggs = bucketAggregations(slot);
InternalAggregations aggs = subAggsForBuckets[slot];
int docCount = queue.getDocCount(slot);
buckets[queue.size()] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs);
}
CompositeKey lastBucket = num > 0 ? buckets[num-1].getRawKey() : null;
return new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), lastBucket, reverseMuls,
earlyTerminated, metadata());
return new InternalAggregation[] {
new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), lastBucket, reverseMuls,
earlyTerminated, metadata())
};
}
@Override

View File

@ -68,8 +68,9 @@ public class FilterAggregator extends BucketsAggregator implements SingleBucketA
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalFilter(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal), metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
new InternalFilter(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata()));
}
@Override

View File

@ -163,23 +163,15 @@ public class FiltersAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
consumeBucketsAndMaybeBreak(keys.length + (showOtherBucket ? 1 : 0));
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(keys.length);
for (int i = 0; i < keys.length; i++) {
long bucketOrd = bucketOrd(owningBucketOrdinal, i);
InternalFilters.InternalBucket bucket = new InternalFilters.InternalBucket(keys[i], bucketDocCount(bucketOrd),
bucketAggregations(bucketOrd), keyed);
buckets.add(bucket);
}
// other bucket
if (showOtherBucket) {
long bucketOrd = bucketOrd(owningBucketOrdinal, keys.length);
InternalFilters.InternalBucket bucket = new InternalFilters.InternalBucket(otherBucketKey, bucketDocCount(bucketOrd),
bucketAggregations(bucketOrd), keyed);
buckets.add(bucket);
}
return new InternalFilters(name, buckets, keyed, metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForFixedBucketCount(owningBucketOrds, keys.length + (showOtherBucket ? 1 : 0),
(offsetInOwningOrd, docCount, subAggregationResults) -> {
if (offsetInOwningOrd < keys.length) {
return new InternalFilters.InternalBucket(keys[offsetInOwningOrd], docCount,
subAggregationResults, keyed);
}
return new InternalFilters.InternalBucket(otherBucketKey, docCount, subAggregationResults, keyed);
}, buckets -> new InternalFilters(name, buckets, keyed, metadata()));
}
@Override

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
@ -106,8 +107,8 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
abstract InternalGeoGridBucket newEmptyBucket();
@Override
public InternalGeoGrid buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
final int size = (int) Math.min(bucketOrds.size(), shardSize);
consumeBucketsAndMaybeBreak(size);
@ -128,11 +129,10 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
final InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
final InternalGeoGridBucket bucket = ordered.pop();
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket;
list[i] = ordered.pop();
}
return buildAggregation(name, requiredSize, Arrays.asList(list), metadata());
buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
return new InternalAggregation[] {buildAggregation(name, requiredSize, Arrays.asList(list), metadata())};
}
@Override

View File

@ -50,9 +50,11 @@ public class GlobalAggregator extends BucketsAggregator implements SingleBucketA
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0 : "global aggregator can only be a top level aggregator";
return new InternalGlobal(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal), metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0: "global aggregator can only be a top level aggregator";
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
new InternalGlobal(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata())
);
}
@Override

View File

@ -41,9 +41,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@ -164,31 +162,21 @@ class AutoDateHistogramAggregator extends DeferableBucketAggregator {
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
consumeBucketsAndMaybeBreak((int) bucketOrds.size());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds,
(bucketValue, docCount, subAggregationResults) ->
new InternalAutoDateHistogram.Bucket(bucketValue, docCount, formatter, subAggregationResults),
buckets -> {
// the contract of the histogram aggregation is that shards must return
// buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
long[] bucketOrdArray = new long[(int) bucketOrds.size()];
for (int i = 0; i < bucketOrds.size(); i++) {
bucketOrdArray[i] = i;
}
// value source will be null for unmapped fields
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos,
roundingIdx, buildEmptySubAggregations());
runDeferredCollections(bucketOrdArray);
List<InternalAutoDateHistogram.Bucket> buckets = new ArrayList<>((int) bucketOrds.size());
for (long i = 0; i < bucketOrds.size(); i++) {
buckets.add(new InternalAutoDateHistogram.Bucket(bucketOrds.get(i), bucketDocCount(i), formatter, bucketAggregations(i)));
}
// the contract of the histogram aggregation is that shards must return
// buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
// value source will be null for unmapped fields
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos, roundingIdx,
buildEmptySubAggregations());
return new InternalAutoDateHistogram(name, buckets, targetBuckets, emptyBucketInfo, formatter, metadata(), 1);
return new InternalAutoDateHistogram(name, buckets, targetBuckets, emptyBucketInfo, formatter, metadata(), 1);
});
}
@Override

View File

@ -38,9 +38,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
@ -133,25 +131,22 @@ class DateHistogramAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
consumeBucketsAndMaybeBreak((int) bucketOrds.size());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds,
(bucketValue, docCount, subAggregationResults) -> {
return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults);
}, buckets -> {
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
List<InternalDateHistogram.Bucket> buckets = new ArrayList<>((int) bucketOrds.size());
for (long i = 0; i < bucketOrds.size(); i++) {
buckets.add(new InternalDateHistogram.Bucket(bucketOrds.get(i), bucketDocCount(i), keyed, formatter, bucketAggregations(i)));
}
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(name, buckets, order, minDocCount, rounding.offset(), emptyBucketInfo, formatter,
keyed, metadata());
// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(name, buckets, order, minDocCount, rounding.offset(), emptyBucketInfo, formatter,
keyed, metadata());
});
}
@Override

View File

@ -41,7 +41,6 @@ import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -157,25 +156,22 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
consumeBucketsAndMaybeBreak((int) bucketOrds.size());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds,
(bucketValue, docCount, subAggregationResults) ->
new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults),
buckets -> {
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
List<InternalDateHistogram.Bucket> buckets = new ArrayList<>((int) bucketOrds.size());
for (long i = 0; i < bucketOrds.size(); i++) {
buckets.add(new InternalDateHistogram.Bucket(bucketOrds.get(i), bucketDocCount(i), keyed, formatter, bucketAggregations(i)));
}
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(name, buckets, order, minDocCount, rounding.offset(), emptyBucketInfo, formatter,
keyed, metadata());
// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(name, buckets, order, minDocCount, rounding.offset(), emptyBucketInfo, formatter,
keyed, metadata());
});
}
@Override

View File

@ -39,9 +39,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
@ -131,24 +129,22 @@ public class NumericHistogramAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
assert bucket == 0;
consumeBucketsAndMaybeBreak((int) bucketOrds.size());
List<InternalHistogram.Bucket> buckets = new ArrayList<>((int) bucketOrds.size());
for (long i = 0; i < bucketOrds.size(); i++) {
double roundKey = Double.longBitsToDouble(bucketOrds.get(i));
double key = roundKey * interval + offset;
buckets.add(new InternalHistogram.Bucket(key, bucketDocCount(i), keyed, formatter, bucketAggregations(i)));
}
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds,
(bucketValue, docCount, subAggregationResults) -> {
double roundKey = Double.longBitsToDouble(bucketValue);
double key = roundKey * interval + offset;
return new InternalHistogram.Bucket(key, docCount, keyed, formatter, subAggregationResults);
}, buckets -> {
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
EmptyBucketInfo emptyBucketInfo = null;
if (minDocCount == 0) {
emptyBucketInfo = new EmptyBucketInfo(interval, offset, minBound, maxBound, buildEmptySubAggregations());
}
return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metadata());
EmptyBucketInfo emptyBucketInfo = null;
if (minDocCount == 0) {
emptyBucketInfo = new EmptyBucketInfo(interval, offset, minBound, maxBound, buildEmptySubAggregations());
}
return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metadata());
});
}
@Override

View File

@ -36,11 +36,11 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.EmptyBucketInfo;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -132,27 +132,25 @@ public class RangeHistogramAggregator extends BucketsAggregator {
};
}
// TODO: buildAggregation and buildEmptyAggregation are literally just copied out of NumericHistogramAggregator. We could refactor
// TODO: buildAggregations and buildEmptyAggregation are literally just copied out of NumericHistogramAggregator. We could refactor
// this to an abstract super class, if we wanted to. Might be overkill.
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
assert bucket == 0;
consumeBucketsAndMaybeBreak((int) bucketOrds.size());
List<InternalHistogram.Bucket> buckets = new ArrayList<>((int) bucketOrds.size());
for (long i = 0; i < bucketOrds.size(); i++) {
double roundKey = Double.longBitsToDouble(bucketOrds.get(i));
double key = roundKey * interval + offset;
buckets.add(new InternalHistogram.Bucket(key, bucketDocCount(i), keyed, formatter, bucketAggregations(i)));
}
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds,
(bucketValue, docCount, subAggregationResults) -> {
double roundKey = Double.longBitsToDouble(bucketValue);
double key = roundKey * interval + offset;
return new InternalHistogram.Bucket(key, docCount, keyed, formatter, subAggregationResults);
}, buckets -> {
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
InternalHistogram.EmptyBucketInfo emptyBucketInfo = null;
if (minDocCount == 0) {
emptyBucketInfo = new InternalHistogram.EmptyBucketInfo(interval, offset, minBound, maxBound, buildEmptySubAggregations());
}
return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metadata());
EmptyBucketInfo emptyBucketInfo = null;
if (minDocCount == 0) {
emptyBucketInfo = new EmptyBucketInfo(interval, offset, minBound, maxBound, buildEmptySubAggregations());
}
return new InternalHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, metadata());
});
}
@Override

View File

@ -73,8 +73,9 @@ public class MissingAggregator extends BucketsAggregator implements SingleBucket
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalMissing(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal), metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
new InternalMissing(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata()));
}
@Override

View File

@ -122,11 +122,12 @@ public class NestedAggregator extends BucketsAggregator implements SingleBucketA
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalNested(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal), metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
new InternalNested(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata()));
}
@Override
@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalNested(name, 0, buildEmptySubAggregations(), metadata());
}

View File

@ -91,8 +91,9 @@ public class ReverseNestedAggregator extends BucketsAggregator implements Single
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalReverseNested(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal), metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
new InternalReverseNested(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata()));
}
@Override

View File

@ -34,7 +34,6 @@ import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@ -326,16 +325,12 @@ public final class BinaryRangeAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
consumeBucketsAndMaybeBreak(ranges.length);
List<InternalBinaryRange.Bucket> buckets = new ArrayList<>(ranges.length);
for (int i = 0; i < ranges.length; ++i) {
long bucketOrd = bucket * ranges.length + i;
buckets.add(new InternalBinaryRange.Bucket(format, keyed,
ranges[i].key, ranges[i].from, ranges[i].to,
bucketDocCount(bucketOrd), bucketAggregations(bucketOrd)));
}
return new InternalBinaryRange(name, format, keyed, buckets, metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForFixedBucketCount(owningBucketOrds, ranges.length,
(offsetInOwningOrd, docCount, subAggregationResults) -> {
Range range = ranges[offsetInOwningOrd];
return new InternalBinaryRange.Bucket(format, keyed, range.key, range.from, range.to, docCount, subAggregationResults);
}, buckets -> new InternalBinaryRange(name, format, keyed, buckets, metadata()));
}
@Override

View File

@ -325,19 +325,12 @@ public class RangeAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
consumeBucketsAndMaybeBreak(ranges.length);
List<org.elasticsearch.search.aggregations.bucket.range.Range.Bucket> buckets = new ArrayList<>(ranges.length);
for (int i = 0; i < ranges.length; i++) {
Range range = ranges[i];
final long bucketOrd = subBucketOrdinal(owningBucketOrdinal, i);
org.elasticsearch.search.aggregations.bucket.range.Range.Bucket bucket =
rangeFactory.createBucket(range.key, range.from, range.to, bucketDocCount(bucketOrd),
bucketAggregations(bucketOrd), keyed, format);
buckets.add(bucket);
}
// value source can be null in the case of unmapped fields
return rangeFactory.create(name, buckets, format, keyed, metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForFixedBucketCount(owningBucketOrds, ranges.length,
(offsetInOwningOrd, docCount, subAggregationResults) -> {
Range range = ranges[offsetInOwningOrd];
return rangeFactory.createBucket(range.key, range.from, range.to, docCount, subAggregationResults, keyed, format);
}, buckets -> rangeFactory.create(name, buckets, format, keyed, metadata()));
}
@Override

View File

@ -164,10 +164,9 @@ public class SamplerAggregator extends DeferableBucketAggregator implements Sing
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
runDeferredCollections(owningBucketOrdinal);
return new InternalSampler(name, bdd == null ? 0 : bdd.getDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal),
metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
new InternalSampler(name, bdd == null ? 0 : bdd.getDocCount(owningBucketOrd), subAggregationResults, metadata()));
}
@Override

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
@ -80,10 +81,10 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
}
@Override
public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
if (valueCount == 0) { // no context in this reader
return buildEmptyAggregation();
return new InternalAggregation[] {buildEmptyAggregation()};
}
final int size;
@ -143,23 +144,20 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
}
final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
final long[] survivingBucketOrds = new long[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantStringTerms.Bucket bucket = ordered.pop();
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
list[i] = ordered.pop();
/*
* The terms are owned by the BytesRefHash which will close after
* we're finished building the aggregation so we need to pull a copy.
*/
list[i].termBytes = BytesRef.deepCopyOf(list[i].termBytes);
}
buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
runDeferredCollections(survivingBucketOrds);
for (SignificantStringTerms.Bucket bucket : list) {
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
}
return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list));
return new InternalAggregation[] {
new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list))
};
}
@Override

View File

@ -24,10 +24,12 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum;
import org.elasticsearch.search.aggregations.bucket.terms.LongTermsAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.ContextIndexSearcher;
@ -47,7 +49,7 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
IncludeExclude.LongFilter includeExclude, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, format, null, bucketCountThresholds, context, parent,
SubAggCollectionMode.BREADTH_FIRST, false, includeExclude, metadata);
SubAggCollectionMode.BREADTH_FIRST, false, includeExclude, false, metadata);
this.significanceHeuristic = significanceHeuristic;
this.termsAggFactory = termsAggFactory;
}
@ -69,25 +71,27 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
}
@Override
public SignificantLongTerms buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
long bucketsInOrd = bucketOrds.bucketsInOrd(0);
final int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
long supersetSize = termsAggFactory.getSupersetNumDocs();
long subsetSize = numCollectedDocs;
BucketSignificancePriorityQueue<SignificantLongTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
SignificantLongTerms.Bucket spare = null;
for (long i = 0; i < bucketOrds.size(); i++) {
final int docCount = bucketDocCount(i);
BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0);
while (ordsEnum.next()) {
final int docCount = bucketDocCount(ordsEnum.ord());
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
}
if (spare == null) {
spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format, 0);
}
spare.term = bucketOrds.get(i);
spare.term = ordsEnum.value();
spare.subsetDf = docCount;
spare.subsetSize = subsetSize;
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.term);
@ -96,7 +100,7 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
// Back at the central reducer these properties will be updated with global stats
spare.updateScore(significanceHeuristic);
spare.bucketOrd = i;
spare.bucketOrd = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
if (spare == null) {
consumeBucketsAndMaybeBreak(1);
@ -104,21 +108,16 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
}
SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()];
final long[] survivingBucketOrds = new long[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantLongTerms.Bucket bucket = ordered.pop();
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
list[i] = ordered.pop();
}
runDeferredCollections(survivingBucketOrds);
buildSubAggsForBuckets(list, bucket -> bucket.bucketOrd, (bucket, aggs) -> bucket.aggregations = aggs);
for (SignificantLongTerms.Bucket bucket : list) {
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
}
return new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list));
return new InternalAggregation[] {
new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list))
};
}
@Override

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
@ -73,8 +74,8 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
}
@Override
public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
long supersetSize = termsAggFactory.getSupersetNumDocs();
@ -111,25 +112,19 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
}
final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
final long[] survivingBucketOrds = new long[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantStringTerms.Bucket bucket = ordered.pop();
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
}
runDeferredCollections(survivingBucketOrds);
for (SignificantStringTerms.Bucket bucket : list) {
list[i] = ordered.pop();
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be
// recycled at some point
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i].termBytes = BytesRef.deepCopyOf(list[i].termBytes);
}
return new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(),
buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
return new InternalAggregation[] {
new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list));
metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list))
};
}
@Override

View File

@ -34,6 +34,7 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
@ -184,8 +185,8 @@ public class SignificantTextAggregator extends BucketsAggregator {
}
@Override
public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
long supersetSize = termsAggFactory.getSupersetNumDocs();
@ -223,16 +224,16 @@ public class SignificantTextAggregator extends BucketsAggregator {
final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantStringTerms.Bucket bucket = ordered.pop();
list[i] = ordered.pop();
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket;
list[i].termBytes = BytesRef.deepCopyOf(list[i].termBytes);
}
buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
return new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(),
return new InternalAggregation[] { new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list));
metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list))
};
}

View File

@ -26,6 +26,8 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms.Bucket;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.internal.SearchContext;
@ -40,9 +42,9 @@ public class DoubleTermsAggregator extends LongTermsAggregator {
DoubleTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format,
BucketOrder order, BucketCountThresholds bucketCountThresholds, SearchContext aggregationContext, Aggregator parent,
SubAggCollectionMode collectionMode, boolean showTermDocCountError, IncludeExclude.LongFilter longFilter,
Map<String, Object> metadata) throws IOException {
boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, format, order, bucketCountThresholds, aggregationContext, parent, collectionMode,
showTermDocCountError, longFilter, metadata);
showTermDocCountError, longFilter, collectsFromSingleBucket, metadata);
}
@Override
@ -51,9 +53,8 @@ public class DoubleTermsAggregator extends LongTermsAggregator {
}
@Override
public DoubleTerms buildAggregation(long owningBucketOrdinal) throws IOException {
final LongTerms terms = (LongTerms) super.buildAggregation(owningBucketOrdinal);
return convertToDouble(terms);
protected InternalAggregation buildResult(long otherDocCount, List<Bucket> buckets) {
return convertToDouble((LongTerms) super.buildResult(otherDocCount, buckets));
}
@Override

View File

@ -165,9 +165,10 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
if (valueCount == 0) { // no context in this reader
return buildEmptyAggregation();
return new InternalAggregation[] {buildEmptyAggregation()};
}
final int size;
@ -215,29 +216,22 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
// Get the top buckets
final StringTerms.Bucket[] list = new StringTerms.Bucket[ordered.size()];
long survivingBucketOrds[] = new long[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
final OrdBucket bucket = ordered.pop();
survivingBucketOrds[i] = bucket.bucketOrd;
BytesRef scratch = new BytesRef();
copy(lookupGlobalOrd.apply(bucket.globalOrd), scratch);
list[i] = new StringTerms.Bucket(scratch, bucket.docCount, null, showTermDocCountError, 0, format);
list[i].bucketOrd = bucket.bucketOrd;
otherDocCount -= list[i].docCount;
list[i].docCountError = 0;
}
//replay any deferred collections
runDeferredCollections(survivingBucketOrds);
buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
//Now build the aggs
for (int i = 0; i < list.length; i++) {
StringTerms.Bucket bucket = list[i];
bucket.aggregations = bucket.docCount == 0 ? bucketEmptyAggregations() : bucketAggregations(bucket.bucketOrd);
bucket.docCountError = 0;
}
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
return new InternalAggregation[] {
new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
otherDocCount, Arrays.asList(list), 0);
otherDocCount, Arrays.asList(list), 0)
};
}
/**

View File

@ -0,0 +1,269 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.terms;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.common.util.ObjectArray;
/**
* Maps long bucket keys to bucket ordinals.
*/
public abstract class LongKeyedBucketOrds implements Releasable {
/**
* Build a {@link LongKeyedBucketOrds}.
*/
public static LongKeyedBucketOrds build(BigArrays bigArrays, boolean collectsFromSingleBucket) {
return collectsFromSingleBucket ? new FromSingle(bigArrays) : new FromMany(bigArrays);
}
private LongKeyedBucketOrds() {}
/**
* Add the {@code owningBucketOrd, term} pair. Return the ord for
* their bucket if they have yet to be added, or {@code -1-ord}
* if they were already present.
*/
public abstract long add(long owningBucketOrd, long value);
/**
* Count the buckets in {@code owningBucketOrd}.
*/
public abstract long bucketsInOrd(long owningBucketOrd);
/**
* Build an iterator for buckets inside {@code owningBucketOrd}.
* <p>
* When this is first returns it is "unpositioned" and you must call
* {@link BucketOrdsEnum#next()} to move it to the first value.
*/
public abstract BucketOrdsEnum ordsEnum(long owningBucketOrd);
/**
* An iterator for buckets inside a particular {@code owningBucketOrd}.
*/
public interface BucketOrdsEnum {
/**
* Advance to the next value.
* @return {@code true} if there *is* a next value,
* {@code false} if there isn't
*/
boolean next();
/**
* The ordinal of the current value.
*/
long ord();
/**
* The current value.
*/
long value();
/**
* An {@linkplain BucketOrdsEnum} that is empty.
*/
BucketOrdsEnum EMPTY = new BucketOrdsEnum() {
@Override
public boolean next() { return false; }
@Override
public long ord() { return 0; }
@Override
public long value() { return 0; }
};
}
/**
* Implementation that only works if it is collecting from a single bucket.
*/
private static class FromSingle extends LongKeyedBucketOrds {
private final LongHash ords;
FromSingle(BigArrays bigArrays) {
ords = new LongHash(1, bigArrays);
}
@Override
public long add(long owningBucketOrd, long value) {
assert owningBucketOrd == 0;
return ords.add(value);
}
@Override
public long bucketsInOrd(long owningBucketOrd) {
assert owningBucketOrd == 0;
return ords.size();
}
@Override
public BucketOrdsEnum ordsEnum(long owningBucketOrd) {
assert owningBucketOrd == 0;
return new BucketOrdsEnum() {
private long ord = -1;
private long value;
@Override
public boolean next() {
ord++;
if (ord >= ords.size()) {
return false;
}
value = ords.get(ord);
return true;
}
@Override
public long value() {
return value;
}
@Override
public long ord() {
return ord;
}
};
}
@Override
public void close() {
ords.close();
}
}
/**
* Implementation that works properly when collecting from many buckets.
*/
private static class FromMany extends LongKeyedBucketOrds {
// TODO we can almost certainly do better here by building something fit for purpose rather than trying to lego together stuff
private static class Buckets implements Releasable {
private final LongHash valueToThisBucketOrd;
private LongArray thisBucketOrdToGlobalOrd;
Buckets(BigArrays bigArrays) {
valueToThisBucketOrd = new LongHash(1, bigArrays);
thisBucketOrdToGlobalOrd = bigArrays.newLongArray(1, false);
}
@Override
public void close() {
Releasables.close(valueToThisBucketOrd, thisBucketOrdToGlobalOrd);
}
}
private final BigArrays bigArrays;
private ObjectArray<Buckets> owningOrdToBuckets;
private long lastGlobalOrd = -1;
FromMany(BigArrays bigArrays) {
this.bigArrays = bigArrays;
owningOrdToBuckets = bigArrays.newObjectArray(1);
}
@Override
public long add(long owningBucketOrd, long value) {
Buckets buckets = bucketsForOrd(owningBucketOrd);
long thisBucketOrd = buckets.valueToThisBucketOrd.add(value);
if (thisBucketOrd < 0) {
// Already in the hash
thisBucketOrd = -1 - thisBucketOrd;
return -1 - buckets.thisBucketOrdToGlobalOrd.get(thisBucketOrd);
}
buckets.thisBucketOrdToGlobalOrd = bigArrays.grow(buckets.thisBucketOrdToGlobalOrd, thisBucketOrd + 1);
lastGlobalOrd++;
buckets.thisBucketOrdToGlobalOrd.set(thisBucketOrd, lastGlobalOrd);
return lastGlobalOrd;
}
private Buckets bucketsForOrd(long owningBucketOrd) {
if (owningOrdToBuckets.size() <= owningBucketOrd) {
owningOrdToBuckets = bigArrays.grow(owningOrdToBuckets, owningBucketOrd + 1);
Buckets buckets = new Buckets(bigArrays);
owningOrdToBuckets.set(owningBucketOrd, buckets);
return buckets;
}
Buckets buckets = owningOrdToBuckets.get(owningBucketOrd);
if (buckets == null) {
buckets = new Buckets(bigArrays);
owningOrdToBuckets.set(owningBucketOrd, buckets);
}
return buckets;
}
@Override
public long bucketsInOrd(long owningBucketOrd) {
if (owningBucketOrd >= owningOrdToBuckets.size()) {
return 0;
}
Buckets buckets = owningOrdToBuckets.get(owningBucketOrd);
if (buckets == null) {
return 0;
}
return buckets.valueToThisBucketOrd.size();
}
@Override
public BucketOrdsEnum ordsEnum(long owningBucketOrd) {
if (owningBucketOrd >= owningOrdToBuckets.size()) {
return BucketOrdsEnum.EMPTY;
}
Buckets buckets = owningOrdToBuckets.get(owningBucketOrd);
if (buckets == null) {
return BucketOrdsEnum.EMPTY;
}
return new BucketOrdsEnum() {
private long thisBucketOrd = -1;
private long value;
private long ord;
@Override
public boolean next() {
thisBucketOrd++;
if (thisBucketOrd >= buckets.valueToThisBucketOrd.size()) {
return false;
}
value = buckets.valueToThisBucketOrd.get(thisBucketOrd);
ord = buckets.thisBucketOrdToGlobalOrd.get(thisBucketOrd);
return true;
}
@Override
public long value() {
return value;
}
@Override
public long ord() {
return ord;
}
};
}
@Override
public void close() {
for (long owningBucketOrd = 0; owningBucketOrd < owningOrdToBuckets.size(); owningBucketOrd++) {
Buckets buckets = owningOrdToBuckets.get(owningBucketOrd);
if (buckets != null) {
buckets.close();
}
}
owningOrdToBuckets.close();
}
}
}

View File

@ -137,18 +137,13 @@ public class LongRareTermsAggregator extends AbstractRareTermsAggregator<ValuesS
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
List<LongRareTerms.Bucket> buckets = buildSketch();
runDeferredCollections(buckets.stream().mapToLong(b -> b.bucketOrd).toArray());
// Finalize the buckets
for (LongRareTerms.Bucket bucket : buckets) {
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
}
buildSubAggsForBuckets(buckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
CollectionUtil.introSort(buckets, ORDER.comparator());
return new LongRareTerms(name, ORDER, metadata(), format, buckets, maxDocCount, filter);
return new InternalAggregation[] {new LongRareTerms(name, ORDER, metadata(), format, buckets, maxDocCount, filter)};
}
@Override

View File

@ -22,7 +22,6 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -32,11 +31,14 @@ import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms.Bucket;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyList;
@ -44,19 +46,19 @@ import static java.util.Collections.emptyList;
public class LongTermsAggregator extends TermsAggregator {
protected final ValuesSource.Numeric valuesSource;
protected final LongHash bucketOrds;
protected final LongKeyedBucketOrds bucketOrds;
private boolean showTermDocCountError;
private LongFilter longFilter;
public LongTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format,
BucketOrder order, BucketCountThresholds bucketCountThresholds, SearchContext aggregationContext, Aggregator parent,
SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, IncludeExclude.LongFilter longFilter,
Map<String, Object> metadata) throws IOException {
boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, aggregationContext, parent, bucketCountThresholds, order, format, subAggCollectMode, metadata);
this.valuesSource = valuesSource;
this.showTermDocCountError = showTermDocCountError;
this.longFilter = longFilter;
bucketOrds = new LongHash(1, aggregationContext.bigArrays());
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}
@Override
@ -77,8 +79,7 @@ public class LongTermsAggregator extends TermsAggregator {
final SortedNumericDocValues values = getValues(valuesSource, ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
public void collect(int doc, long owningBucketOrd) throws IOException {
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
@ -87,7 +88,7 @@ public class LongTermsAggregator extends TermsAggregator {
final long val = values.nextValue();
if (previous != val || i == 0) {
if ((longFilter == null) || (longFilter.accept(val))) {
long bucketOrdinal = bucketOrds.add(val);
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
@ -105,69 +106,73 @@ public class LongTermsAggregator extends TermsAggregator {
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
if (bucketCountThresholds.getMinDocCount() == 0 && (InternalOrder.isCountDesc(order) == false ||
bucketOrds.size() < bucketCountThresholds.getRequiredSize())) {
// we need to fill-in the blanks
for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) {
final SortedNumericDocValues values = getValues(valuesSource, ctx);
for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) {
if (values.advanceExact(docId)) {
final int valueCount = values.docValueCount();
for (int i = 0; i < valueCount; ++i) {
long value = values.nextValue();
if (longFilter == null || longFilter.accept(value)) {
bucketOrds.add(value);
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
LongTerms.Bucket[][] topBucketsPerOrd = new LongTerms.Bucket[owningBucketOrds.length][];
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
if (bucketCountThresholds.getMinDocCount() == 0 && (InternalOrder.isCountDesc(order) == false ||
bucketsInOrd < bucketCountThresholds.getRequiredSize())) {
// we need to fill-in the blanks
for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) {
final SortedNumericDocValues values = getValues(valuesSource, ctx);
for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) {
if (values.advanceExact(docId)) {
final int valueCount = values.docValueCount();
for (int v = 0; v < valueCount; ++v) {
long value = values.nextValue();
if (longFilter == null || longFilter.accept(value)) {
bucketOrds.add(owningBucketOrds[ordIdx], value);
}
}
}
}
}
bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
}
}
final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
long otherDocCount = 0;
BucketPriorityQueue<LongTerms.Bucket> ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
LongTerms.Bucket spare = null;
for (long i = 0; i < bucketOrds.size(); i++) {
if (spare == null) {
spare = new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0, format);
}
spare.term = bucketOrds.get(i);
spare.docCount = bucketDocCount(i);
otherDocCount += spare.docCount;
spare.bucketOrd = i;
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
spare = ordered.insertWithOverflow(spare);
final int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
BucketPriorityQueue<LongTerms.Bucket> ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
LongTerms.Bucket spare = null;
BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
if (spare == null) {
consumeBucketsAndMaybeBreak(1);
spare = new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0, format);
}
spare.term = ordsEnum.value();
spare.docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += spare.docCount;
spare.bucketOrd = ordsEnum.ord();
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
spare = ordered.insertWithOverflow(spare);
if (spare == null) {
consumeBucketsAndMaybeBreak(1);
}
}
}
// Get the top buckets
LongTerms.Bucket[] list = topBucketsPerOrd[ordIdx] = new LongTerms.Bucket[ordered.size()];
for (int b = ordered.size() - 1; b >= 0; --b) {
list[b] = ordered.pop();
list[b].docCountError = 0;
otherDocCounts[ordIdx] -= list[b].docCount;
}
}
// Get the top buckets
final LongTerms.Bucket[] list = new LongTerms.Bucket[ordered.size()];
long survivingBucketOrds[] = new long[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
final LongTerms.Bucket bucket = (LongTerms.Bucket) ordered.pop();
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
otherDocCount -= bucket.docCount;
}
runDeferredCollections(survivingBucketOrds);
// Now build the aggs
for (int i = 0; i < list.length; i++) {
list[i].aggregations = bucketAggregations(list[i].bucketOrd);
list[i].docCountError = 0;
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
result[ordIdx] = buildResult(otherDocCounts[ordIdx], Arrays.asList(topBucketsPerOrd[ordIdx]));
}
return result;
}
protected InternalAggregation buildResult(long otherDocCount, List<Bucket> buckets) {
return new LongTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount,
Arrays.asList(list), 0);
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount,
buckets, 0);
}
@Override
@ -178,7 +183,7 @@ public class LongTermsAggregator extends TermsAggregator {
@Override
public void doClose() {
super.doClose();
Releasables.close(bucketOrds);
}
}

View File

@ -143,19 +143,13 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator<Value
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
List<StringRareTerms.Bucket> buckets = buildSketch();
runDeferredCollections(buckets.stream().mapToLong(b -> b.bucketOrd).toArray());
// Finalize the buckets
for (StringRareTerms.Bucket bucket : buckets) {
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
}
buildSubAggsForBuckets(buckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
CollectionUtil.introSort(buckets, ORDER.comparator());
return new StringRareTerms(name, ORDER, metadata(), format, buckets, maxDocCount, filter);
return new InternalAggregation[] {new StringRareTerms(name, ORDER, metadata(), format, buckets, maxDocCount, filter)};
}
@Override

View File

@ -108,8 +108,8 @@ public class StringTermsAggregator extends AbstractStringTermsAggregator {
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
if (bucketCountThresholds.getMinDocCount() == 0
&& (InternalOrder.isCountDesc(order) == false
@ -155,26 +155,20 @@ public class StringTermsAggregator extends AbstractStringTermsAggregator {
// Get the top buckets
final StringTerms.Bucket[] list = new StringTerms.Bucket[ordered.size()];
long survivingBucketOrds[] = new long[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
final StringTerms.Bucket bucket = ordered.pop();
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
otherDocCount -= bucket.docCount;
}
// replay any deferred collections
runDeferredCollections(survivingBucketOrds);
// Now build the aggs
for (final StringTerms.Bucket bucket : list) {
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
bucket.termBytes = BytesRef.deepCopyOf(list[i].termBytes);
bucket.docCountError = 0;
}
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
return new InternalAggregation[] {
new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount,
Arrays.asList(list), 0);
Arrays.asList(list), 0)
};
}
@Override

View File

@ -90,7 +90,9 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
Aggregator parent,
SubAggCollectionMode subAggCollectMode,
boolean showTermDocCountError,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
assert collectsFromSingleBucket;
ExecutionMode execution = null;
if (executionHint != null) {
@ -124,6 +126,11 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
context, parent, subAggCollectMode, showTermDocCountError, metadata);
}
@Override
public boolean needsToCollectFromSingleBucket() {
return true;
}
};
}
@ -146,6 +153,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
Aggregator parent,
SubAggCollectionMode subAggCollectMode,
boolean showTermDocCountError,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
if ((includeExclude != null) && (includeExclude.isRegexBased())) {
@ -169,14 +177,19 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
}
return new DoubleTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, format, order,
bucketCountThresholds, context, parent, subAggCollectMode, showTermDocCountError, longFilter,
metadata);
collectsFromSingleBucket, metadata);
}
if (includeExclude != null) {
longFilter = includeExclude.convertToLongFilter(format);
}
return new LongTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, format, order,
bucketCountThresholds, context, parent, subAggCollectMode, showTermDocCountError, longFilter,
metadata);
collectsFromSingleBucket, metadata);
}
@Override
public boolean needsToCollectFromSingleBucket() {
return false;
}
};
}
@ -236,10 +249,6 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
Aggregator parent,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config.valueSourceType(),
TermsAggregationBuilder.NAME);
if (aggregatorSupplier instanceof TermsAggregatorSupplier == false) {
@ -248,6 +257,10 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
}
TermsAggregatorSupplier termsAggregatorSupplier = (TermsAggregatorSupplier) aggregatorSupplier;
if (collectsFromSingleBucket == false && termsAggregatorSupplier.needsToCollectFromSingleBucket()) {
return asMultiBucketAggregator(this, searchContext, parent);
}
BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds);
if (InternalOrder.isKeyOrder(order) == false
&& bucketCountThresholds.getShardSize() == TermsAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
@ -260,7 +273,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
return termsAggregatorSupplier.build(name, factories, valuesSource, order, config.format(),
bucketCountThresholds, includeExclude, executionHint, searchContext, parent, collectMode,
showTermDocCountError, metadata);
showTermDocCountError, collectsFromSingleBucket, metadata);
}
// return the SubAggCollectionMode that this aggregation should use based on the expected size

View File

@ -42,5 +42,8 @@ interface TermsAggregatorSupplier extends AggregatorSupplier {
Aggregator parent,
Aggregator.SubAggCollectionMode subAggCollectMode,
boolean showTermDocCountError,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException;
boolean needsToCollectFromSingleBucket();
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorBase;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
@ -31,4 +32,19 @@ public abstract class MetricsAggregator extends AggregatorBase {
protected MetricsAggregator(String name, SearchContext context, Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, AggregatorFactories.EMPTY, context, parent, metadata);
}
/**
* Build an aggregation for data that has been collected into
* {@code owningBucketOrd}.
*/
public abstract InternalAggregation buildAggregation(long owningBucketOrd) throws IOException;
@Override
public final InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildAggregation(owningBucketOrds[ordIdx]);
}
return results;
}
}

View File

@ -84,16 +84,14 @@ public class ProfilingAggregator extends Aggregator {
}
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
Timer timer = profileBreakdown.getTimer(AggregationTimingType.BUILD_AGGREGATION);
timer.start();
InternalAggregation result;
try {
result = delegate.buildAggregation(bucket);
return delegate.buildAggregations(owningBucketOrds);
} finally {
timer.stop();
}
return result;
}
@Override

View File

@ -80,7 +80,7 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase {
collector.preCollection();
indexSearcher.search(termQuery, collector);
collector.postCollection();
collector.replay(0);
collector.prepareSelectedBuckets(0);
assertEquals(topDocs.scoreDocs.length, deferredCollectedDocIds.size());
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
@ -94,7 +94,7 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase {
collector.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), collector);
collector.postCollection();
collector.replay(0);
collector.prepareSelectedBuckets(0);
assertEquals(topDocs.scoreDocs.length, deferredCollectedDocIds.size());
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {

View File

@ -82,7 +82,7 @@ public class GlobalAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
InternalGlobal result = (InternalGlobal) aggregator.buildAggregation(0L);
InternalGlobal result = (InternalGlobal) aggregator.buildTopLevel();
verify.accept(result, (InternalMin) result.getAggregations().asMap().get("in_global"));
indexReader.close();

View File

@ -235,7 +235,7 @@ public abstract class GeoGridAggregatorTestCase<T extends InternalGeoGridBucket>
aggregator.preCollection();
indexSearcher.search(query, aggregator);
aggregator.postCollection();
verify.accept((InternalGeoGrid<T>) aggregator.buildAggregation(0L));
verify.accept((InternalGeoGrid<T>) aggregator.buildTopLevel());
indexReader.close();
directory.close();

View File

@ -38,6 +38,7 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.settings.Settings;
@ -55,6 +56,8 @@ import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -62,15 +65,16 @@ import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongTermsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.aggregations.metrics.InternalSum;
import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Min;
import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalSum;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
@ -89,9 +93,12 @@ import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.LongStream;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.search.aggregations.AggregationBuilders.max;
import static org.elasticsearch.search.aggregations.AggregationBuilders.nested;
import static org.hamcrest.Matchers.equalTo;
public class NestedAggregatorTests extends AggregatorTestCase {
@ -103,7 +110,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
private static final String SUM_AGG_NAME = "sumAgg";
private static final String INVERSE_SCRIPT = "inverse";
private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
private static final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
/**
* For each provided field type, we also register an alias with name {@code <field>-alias}.
@ -115,6 +122,14 @@ public class NestedAggregatorTests extends AggregatorTestCase {
Function.identity()));
}
/**
* Nested aggregations need the {@linkplain DirectoryReader} wrapped.
*/
@Override
protected IndexReader wrapDirectoryReader(DirectoryReader reader) throws IOException {
return wrapInMockESDirectoryReader(reader);
}
@Override
protected ScriptService getMockScriptService() {
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
@ -132,7 +147,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) {
// intentionally not writing any docs
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
NESTED_OBJECT);
MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME)
@ -179,7 +194,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
}
iw.commit();
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
NESTED_OBJECT);
MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME)
@ -232,7 +247,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
}
iw.commit();
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
NESTED_OBJECT + "." + NESTED_OBJECT2);
MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME)
@ -290,7 +305,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
iw.addDocuments(documents);
iw.commit();
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
NESTED_OBJECT);
SumAggregationBuilder sumAgg = new SumAggregationBuilder(SUM_AGG_NAME)
@ -373,7 +388,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
iw.commit();
iw.close();
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
"nested_field");
@ -411,7 +426,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
iw.addDocuments(generateBook("8", new String[]{"f"}, new int[]{12, 14}));
iw.addDocuments(generateBook("9", new String[]{"g", "c", "e"}, new int[]{18, 8}));
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
MappedFieldType fieldType1 = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
fieldType1.setName("num_pages");
MappedFieldType fieldType2 = new KeywordFieldMapper.KeywordFieldType();
@ -549,7 +564,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
return cmp;
}
});
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
MappedFieldType fieldType1 = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
fieldType1.setName("num_pages");
MappedFieldType fieldType2 = new KeywordFieldMapper.KeywordFieldType();
@ -647,7 +662,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
iw.addDocuments(documents);
iw.commit();
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
TermsAggregationBuilder valueBuilder = new TermsAggregationBuilder("value").userValueTypeHint(ValueType.STRING)
.field("value");
TermsAggregationBuilder keyBuilder = new TermsAggregationBuilder("key").userValueTypeHint(ValueType.STRING)
@ -722,7 +737,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
iw.commit();
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
NestedAggregationBuilder agg = nested(NESTED_AGG, NESTED_OBJECT).subAggregation(
max(MAX_AGG_NAME).field(VALUE_FIELD_NAME));
NestedAggregationBuilder aliasAgg = nested(NESTED_AGG, NESTED_OBJECT).subAggregation(
@ -765,7 +780,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
}
iw.commit();
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, NESTED_OBJECT)
.subAggregation(new TermsAggregationBuilder("terms").field(VALUE_FIELD_NAME).userValueTypeHint(ValueType.NUMERIC)
.subAggregation(new MaxAggregationBuilder(MAX_AGG_NAME).field(VALUE_FIELD_NAME))
@ -799,6 +814,61 @@ public class NestedAggregatorTests extends AggregatorTestCase {
}
}
/**
* {@link LongTermsAggregator} is the first complex bucking aggregation
* that stopped wrapping itself in {@link AggregatorFactory#asMultiBucketAggregator}
* so this tests that nested works properly inside of it.
*/
public void testNestedUnderLongTerms() throws IOException {
int numProducts = scaledRandomIntBetween(1, 100);
int numResellers = scaledRandomIntBetween(1, 100);
AggregationBuilder b = new TermsAggregationBuilder("products").field("product_id").size(numProducts)
.subAggregation(new NestedAggregationBuilder("nested", "nested_reseller")
.subAggregation(new TermsAggregationBuilder("resellers").field("reseller_id").size(numResellers)));
testCase(b, new MatchAllDocsQuery(), buildResellerData(numProducts, numResellers), result -> {
LongTerms products = (LongTerms) result;
assertThat(products.getBuckets().stream().map(LongTerms.Bucket::getKeyAsNumber).collect(toList()),
equalTo(LongStream.range(0, numProducts).mapToObj(Long::valueOf).collect(toList())));
for (int p = 0; p < numProducts; p++) {
LongTerms.Bucket bucket = products.getBucketByKey(Integer.toString(p));
assertThat(bucket.getDocCount(), equalTo(1L));
InternalNested nested = bucket.getAggregations().get("nested");
assertThat(nested.getDocCount(), equalTo((long) numResellers));
LongTerms resellers = nested.getAggregations().get("resellers");
assertThat(resellers.getBuckets().stream().map(LongTerms.Bucket::getKeyAsNumber).collect(toList()),
equalTo(LongStream.range(0, numResellers).mapToObj(Long::valueOf).collect(toList())));
}
}, resellersMappedFields());
}
public static CheckedConsumer<RandomIndexWriter, IOException> buildResellerData(int numProducts, int numResellers) {
return iw -> {
for (int p = 0; p < numProducts; p++) {
List<Document> documents = new ArrayList<>();
generateDocuments(documents, numResellers, p, "nested_reseller", "value");
for (int r = 0; r < documents.size(); r++) {
documents.get(r).add(new SortedNumericDocValuesField("reseller_id", r));
}
Document document = new Document();
document.add(new Field(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(p)), IdFieldMapper.Defaults.FIELD_TYPE));
document.add(new Field(TypeFieldMapper.NAME, "__nested_field", TypeFieldMapper.Defaults.FIELD_TYPE));
document.add(sequenceIDFields.primaryTerm);
document.add(new SortedNumericDocValuesField("product_id", p));
documents.add(document);
iw.addDocuments(documents);
}
};
}
public static MappedFieldType[] resellersMappedFields() {
MappedFieldType productIdField = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
productIdField.setName("product_id");
MappedFieldType resellerIdField = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
resellerIdField.setName("reseller_id");
return new MappedFieldType[] {productIdField, resellerIdField};
}
private double generateMaxDocs(List<Document> documents, int numNestedDocs, int id, String path, String fieldName) {
return DoubleStream.of(generateDocuments(documents, numNestedDocs, id, path, fieldName))
.max().orElse(Double.NEGATIVE_INFINITY);
@ -808,8 +878,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
return DoubleStream.of(generateDocuments(documents, numNestedDocs, id, path, fieldName)).sum();
}
private double[] generateDocuments(List<Document> documents, int numNestedDocs, int id, String path, String fieldName) {
private static double[] generateDocuments(List<Document> documents, int numNestedDocs, int id, String path, String fieldName) {
double[] values = new double[numNestedDocs];
for (int nested = 0; nested < numNestedDocs; nested++) {
Document document = new Document();

View File

@ -33,8 +33,13 @@ import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.TypeFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongTermsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
@ -45,10 +50,13 @@ import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.search.aggregations.AggregationBuilders.max;
import static org.elasticsearch.search.aggregations.AggregationBuilders.nested;
import static org.elasticsearch.search.aggregations.AggregationBuilders.reverseNested;
import static org.hamcrest.Matchers.equalTo;
public class ReverseNestedAggregatorTests extends AggregatorTestCase {
@ -68,12 +76,20 @@ public class ReverseNestedAggregatorTests extends AggregatorTestCase {
Function.identity()));
}
/**
* Nested aggregations need the {@linkplain DirectoryReader} wrapped.
*/
@Override
protected IndexReader wrapDirectoryReader(DirectoryReader reader) throws IOException {
return wrapInMockESDirectoryReader(reader);
}
public void testNoDocs() throws IOException {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) {
// intentionally not writing any docs
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
NESTED_OBJECT);
ReverseNestedAggregationBuilder reverseNestedBuilder
@ -137,7 +153,7 @@ public class ReverseNestedAggregatorTests extends AggregatorTestCase {
}
iw.commit();
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
NESTED_OBJECT);
ReverseNestedAggregationBuilder reverseNestedBuilder
@ -207,7 +223,7 @@ public class ReverseNestedAggregatorTests extends AggregatorTestCase {
iw.commit();
}
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
MaxAggregationBuilder maxAgg = max(MAX_AGG_NAME).field(VALUE_FIELD_NAME);
MaxAggregationBuilder aliasMaxAgg = max(MAX_AGG_NAME).field(VALUE_FIELD_NAME + "-alias");
@ -230,4 +246,35 @@ public class ReverseNestedAggregatorTests extends AggregatorTestCase {
}
}
/**
* {@link LongTermsAggregator} is the first complex bucking aggregation
* that stopped wrapping itself in {@link AggregatorFactory#asMultiBucketAggregator}
* so this tests that nested works properly inside of it.
*/
public void testNestedUnderLongTerms() throws IOException {
int numProducts = scaledRandomIntBetween(1, 100);
int numResellers = scaledRandomIntBetween(1, 100);
AggregationBuilder b = new NestedAggregationBuilder("nested", "nested_reseller")
.subAggregation(new TermsAggregationBuilder("resellers").field("reseller_id").size(numResellers)
.subAggregation(new ReverseNestedAggregationBuilder("reverse_nested")
.subAggregation(new TermsAggregationBuilder("products").field("product_id").size(numProducts))));
testCase(b, new MatchAllDocsQuery(), NestedAggregatorTests.buildResellerData(numProducts, numResellers), result -> {
InternalNested nested = (InternalNested) result;
assertThat(nested.getDocCount(), equalTo((long) numProducts * numResellers));
LongTerms resellers = nested.getAggregations().get("resellers");
assertThat(resellers.getBuckets().stream().map(LongTerms.Bucket::getKeyAsNumber).collect(toList()),
equalTo(LongStream.range(0, numResellers).mapToObj(Long::valueOf).collect(toList())));
for (int r = 0; r < numResellers; r++) {
LongTerms.Bucket bucket = resellers.getBucketByKey(Integer.toString(r));
assertThat(bucket.getDocCount(), equalTo((long) numProducts));
InternalReverseNested reverseNested = bucket.getAggregations().get("reverse_nested");
assertThat(reverseNested.getDocCount(), equalTo((long) numProducts));
LongTerms products = reverseNested.getAggregations().get("products");
assertThat(products.getBuckets().stream().map(LongTerms.Bucket::getKeyAsNumber).collect(toList()),
equalTo(LongStream.range(0, numProducts).mapToObj(Long::valueOf).collect(toList())));
}
}, NestedAggregatorTests.resellersMappedFields());
}
}

View File

@ -75,7 +75,7 @@ public class BestDocsDeferringCollectorTests extends AggregatorTestCase {
collector.preCollection();
indexSearcher.search(termQuery, collector);
collector.postCollection();
collector.replay(0);
collector.prepareSelectedBuckets(0);
assertEquals(topDocs.scoreDocs.length, deferredCollectedDocIds.size());
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {

View File

@ -0,0 +1,186 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.terms;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
public class LongKeyedBucketOrdsTests extends ESTestCase {
private final MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
public void testExplicitCollectsFromSingleBucket() {
collectsFromSingleBucketCase(LongKeyedBucketOrds.build(bigArrays, true));
}
public void testSurpriseCollectsFromSingleBucket() {
collectsFromSingleBucketCase(LongKeyedBucketOrds.build(bigArrays, false));
}
private void collectsFromSingleBucketCase(LongKeyedBucketOrds ords) {
try {
// Test a few explicit values
assertThat(ords.add(0, 0), equalTo(0L));
assertThat(ords.add(0, 1000), equalTo(1L));
assertThat(ords.add(0, 0), equalTo(-1L));
assertThat(ords.add(0, 1000), equalTo(-2L));
// And some random values
Set<Long> seen = new HashSet<>();
seen.add(0L);
seen.add(1000L);
long[] values = new long[scaledRandomIntBetween(1, 10000)];
for (int i = 0; i < values.length; i++) {
values[i] = randomValueOtherThanMany(seen::contains, ESTestCase::randomLong);
seen.add(values[i]);
}
for (int i = 0; i < values.length; i++) {
assertThat(ords.add(0, values[i]), equalTo(i + 2L));
if (randomBoolean()) {
assertThat(ords.add(0, 0), equalTo(-1L));
}
}
for (int i = 0; i < values.length; i++) {
assertThat(ords.add(0, values[i]), equalTo(-1 - (i + 2L)));
}
// And the explicit values are still ok
assertThat(ords.add(0, 0), equalTo(-1L));
assertThat(ords.add(0, 1000), equalTo(-2L));
// Check counting values
assertThat(ords.bucketsInOrd(0), equalTo(values.length + 2L));
// Check iteration
LongKeyedBucketOrds.BucketOrdsEnum ordEnum = ords.ordsEnum(0);
assertTrue(ordEnum.next());
assertThat(ordEnum.ord(), equalTo(0L));
assertThat(ordEnum.value(), equalTo(0L));
assertTrue(ordEnum.next());
assertThat(ordEnum.ord(), equalTo(1L));
assertThat(ordEnum.value(), equalTo(1000L));
for (int i = 0; i < values.length; i++) {
assertTrue(ordEnum.next());
assertThat(ordEnum.ord(), equalTo(i + 2L));
assertThat(ordEnum.value(), equalTo(values[i]));
}
assertFalse(ordEnum.next());
} finally {
ords.close();
}
}
public void testCollectsFromManyBuckets() {
try (LongKeyedBucketOrds ords = LongKeyedBucketOrds.build(bigArrays, false)) {
// Test a few explicit values
assertThat(ords.add(0, 0), equalTo(0L));
assertThat(ords.add(1, 0), equalTo(1L));
assertThat(ords.add(0, 0), equalTo(-1L));
assertThat(ords.add(1, 0), equalTo(-2L));
// And some random values
Set<OwningBucketOrdAndValue> seen = new HashSet<>();
seen.add(new OwningBucketOrdAndValue(0, 0));
seen.add(new OwningBucketOrdAndValue(1, 0));
OwningBucketOrdAndValue[] values = new OwningBucketOrdAndValue[scaledRandomIntBetween(1, 10000)];
long maxOwningBucketOrd = scaledRandomIntBetween(0, values.length);
for (int i = 0; i < values.length; i++) {
values[i] = randomValueOtherThanMany(seen::contains, () ->
new OwningBucketOrdAndValue(randomLongBetween(0, maxOwningBucketOrd), randomLong()));
seen.add(values[i]);
}
for (int i = 0; i < values.length; i++) {
assertThat(ords.add(values[i].owningBucketOrd, values[i].value), equalTo(i + 2L));
if (randomBoolean()) {
assertThat(ords.add(0, 0), equalTo(-1L));
}
}
for (int i = 0; i < values.length; i++) {
assertThat(ords.add(values[i].owningBucketOrd, values[i].value), equalTo(-1 - (i + 2L)));
}
// And the explicit values are still ok
assertThat(ords.add(0, 0), equalTo(-1L));
assertThat(ords.add(1, 0), equalTo(-2L));
for (long owningBucketOrd = 0; owningBucketOrd <= maxOwningBucketOrd; owningBucketOrd++) {
long expectedCount = 0;
LongKeyedBucketOrds.BucketOrdsEnum ordEnum = ords.ordsEnum(owningBucketOrd);
if (owningBucketOrd <= 1) {
expectedCount++;
assertTrue(ordEnum.next());
assertThat(ordEnum.ord(), equalTo(owningBucketOrd));
assertThat(ordEnum.value(), equalTo(0L));
}
for (int i = 0; i < values.length; i++) {
if (values[i].owningBucketOrd == owningBucketOrd) {
expectedCount++;
assertTrue(ordEnum.next());
assertThat(ordEnum.ord(), equalTo(i + 2L));
assertThat(ordEnum.value(), equalTo(values[i].value));
}
}
assertFalse(ordEnum.next());
assertThat(ords.bucketsInOrd(owningBucketOrd), equalTo(expectedCount));
}
assertFalse(ords.ordsEnum(randomLongBetween(maxOwningBucketOrd + 1, Long.MAX_VALUE)).next());
assertThat(ords.bucketsInOrd(randomLongBetween(maxOwningBucketOrd + 1, Long.MAX_VALUE)), equalTo(0L));
}
}
private class OwningBucketOrdAndValue {
private final long owningBucketOrd;
private final long value;
OwningBucketOrdAndValue(long owningBucketOrd, long value) {
this.owningBucketOrd = owningBucketOrd;
this.value = value;
}
@Override
public String toString() {
return owningBucketOrd + "/" + value;
}
@Override
public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
OwningBucketOrdAndValue other = (OwningBucketOrdAndValue) obj;
return owningBucketOrd == other.owningBucketOrd && value == other.value;
}
@Override
public int hashCode() {
return Objects.hash(owningBucketOrd, value);
}
}
}

View File

@ -276,7 +276,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
RareTerms result = (RareTerms) aggregator.buildAggregation(0L);
RareTerms result = (RareTerms) aggregator.buildTopLevel();
assertEquals("_name", result.getName());
assertEquals(0, result.getBuckets().size());
}
@ -431,7 +431,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
fieldType.setHasDocValues(true);
fieldType.setName("nested_value");
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
InternalNested result = searchAndReduce(newIndexSearcher(indexReader),
// match root document only
new DocValuesFieldExistsQuery(PRIMARY_TERM_NAME), nested, fieldType);
@ -470,7 +470,7 @@ public class RareTermsAggregatorTests extends AggregatorTestCase {
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
fieldType.setHasDocValues(true);
fieldType.setName("nested_value");
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
if (withScore) {

View File

@ -259,7 +259,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildAggregation(0L);
Terms result = (Terms) aggregator.buildTopLevel();
assertEquals(5, result.getBuckets().size());
assertEquals("", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
@ -330,7 +330,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildAggregation(0L);
Terms result = (Terms) aggregator.buildTopLevel();
assertEquals(10, result.getBuckets().size());
assertEquals("val000", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -367,7 +367,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildAggregation(0L);
result = (Terms) aggregator.buildTopLevel();
assertEquals(5, result.getBuckets().size());
assertEquals("val001", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -391,7 +391,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildAggregation(0L);
result = (Terms) aggregator.buildTopLevel();
assertEquals(8, result.getBuckets().size());
assertEquals("val002", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -420,7 +420,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildAggregation(0L);
result = (Terms) aggregator.buildTopLevel();
assertEquals(2, result.getBuckets().size());
assertEquals("val010", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -437,7 +437,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildAggregation(0L);
result = (Terms) aggregator.buildTopLevel();
assertEquals(2, result.getBuckets().size());
assertEquals("val000", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -455,7 +455,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildAggregation(0L);
result = (Terms) aggregator.buildTopLevel();
assertEquals(2, result.getBuckets().size());
assertEquals("val000", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -511,7 +511,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildAggregation(0L);
Terms result = (Terms) aggregator.buildTopLevel();
assertEquals(2, result.getBuckets().size());
assertEquals(0L, result.getBuckets().get(0).getKey());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -528,7 +528,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildAggregation(0L);
result = (Terms) aggregator.buildTopLevel();
assertEquals(4, result.getBuckets().size());
assertEquals(1L, result.getBuckets().get(0).getKey());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -552,7 +552,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildAggregation(0L);
result = (Terms) aggregator.buildTopLevel();
assertEquals(2, result.getBuckets().size());
assertEquals(0.0, result.getBuckets().get(0).getKey());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -569,7 +569,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildAggregation(0L);
result = (Terms) aggregator.buildTopLevel();
assertEquals(4, result.getBuckets().size());
assertEquals(1.0, result.getBuckets().get(0).getKey());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -735,7 +735,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildAggregation(0L);
Terms result = (Terms) aggregator.buildTopLevel();
assertEquals(size, result.getBuckets().size());
for (int i = 0; i < size; i++) {
Map.Entry<T, Integer> expected = expectedBuckets.get(i);
@ -762,7 +762,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = ((Filter) aggregator.buildAggregation(0L)).getAggregations().get("_name2");
result = ((Filter) aggregator.buildTopLevel()).getAggregations().get("_name2");
int expectedFilteredCounts = 0;
for (Integer count : filteredCounts.values()) {
if (count > 0) {
@ -838,7 +838,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildAggregation(0L);
Terms result = (Terms) aggregator.buildTopLevel();
assertEquals(size, result.getBuckets().size());
for (int i = 0; i < size; i++) {
Map.Entry<T, Long> expected = expectedBuckets.get(i);
@ -873,7 +873,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildAggregation(0L);
Terms result = (Terms) aggregator.buildTopLevel();
assertEquals("_name", result.getName());
assertEquals(0, result.getBuckets().size());
@ -883,7 +883,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildAggregation(0L);
result = (Terms) aggregator.buildTopLevel();
assertEquals("_name", result.getName());
assertEquals(0, result.getBuckets().size());
@ -893,7 +893,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildAggregation(0L);
result = (Terms) aggregator.buildTopLevel();
assertEquals("_name", result.getName());
assertEquals(0, result.getBuckets().size());
}
@ -916,7 +916,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildAggregation(0L);
Terms result = (Terms) aggregator.buildTopLevel();
assertEquals("_name", result.getName());
assertEquals(0, result.getBuckets().size());
assertFalse(AggregationInspectionHelper.hasValue((InternalTerms)result));
@ -954,7 +954,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildAggregation(0L);
Terms result = (Terms) aggregator.buildTopLevel();
assertEquals("_name", result.getName());
assertEquals(1, result.getBuckets().size());
assertEquals(missingValues[i], result.getBuckets().get(0).getKey());
@ -1035,7 +1035,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildAggregation(0L);
Terms result = (Terms) aggregator.buildTopLevel();
assertEquals("_name", result.getName());
assertEquals(1, result.getBuckets().size());
assertEquals("192.168.100.42", result.getBuckets().get(0).getKey());
@ -1087,7 +1087,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildAggregation(0L);
Terms result = (Terms) aggregator.buildTopLevel();
assertEquals(3, result.getBuckets().size());
assertEquals("a", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -1247,7 +1247,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
fieldType.setHasDocValues(true);
fieldType.setName("nested_value");
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
{
InternalNested result = search(newSearcher(indexReader, false, true),
// match root document only
@ -1292,6 +1292,51 @@ public class TermsAggregatorTests extends AggregatorTestCase {
}, fieldType);
}
public void testThreeLayerLong() throws IOException {
try (Directory dir = newDirectory()) {
try (RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) {
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
for (int k = 0; k < 10; k++) {
Document d = new Document();
d.add(new SortedNumericDocValuesField("i", i));
d.add(new SortedNumericDocValuesField("j", j));
d.add(new SortedNumericDocValuesField("k", k));
writer.addDocument(d);
}
}
}
try (IndexReader reader = maybeWrapReaderEs(writer.getReader())) {
IndexSearcher searcher = newIndexSearcher(reader);
TermsAggregationBuilder request = new TermsAggregationBuilder("i").field("i")
.subAggregation(new TermsAggregationBuilder("j").field("j")
.subAggregation(new TermsAggregationBuilder("k").field("k")));
LongTerms result = search(searcher, new MatchAllDocsQuery(), request,
longField("i"), longField("j"), longField("k"));
for (int i = 0; i < 10; i++) {
LongTerms.Bucket iBucket = result.getBucketByKey(Integer.toString(i));
assertThat(iBucket.getDocCount(), equalTo(100L));
LongTerms jAgg = iBucket.getAggregations().get("j");
for (int j = 0; j < 10; j++) {
LongTerms.Bucket jBucket = jAgg.getBucketByKey(Integer.toString(j));
assertThat(jBucket.getDocCount(), equalTo(10L));
LongTerms kAgg = jBucket.getAggregations().get("k");
for (int k = 0; k < 10; k++) {
LongTerms.Bucket kBucket = kAgg.getBucketByKey(Integer.toString(k));
assertThat(kBucket.getDocCount(), equalTo(1L));
}
}
}
}
}
}
}
private NumberFieldMapper.NumberFieldType longField(String name) {
NumberFieldMapper.NumberFieldType type = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
type.setName(name);
return type;
}
private void assertNestedTopHitsScore(InternalMultiBucketAggregation<?, ?> terms, boolean withScore) {
assertThat(terms.getBuckets().size(), equalTo(9));
@ -1404,7 +1449,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
searcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
return aggregator.buildAggregation(0L);
return aggregator.buildTopLevel();
}
}

View File

@ -546,7 +546,7 @@ public class AvgAggregatorTests extends AggregatorTestCase {
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms terms = (Terms) aggregator.buildAggregation(0L);
Terms terms = (Terms) aggregator.buildTopLevel();
assertNotNull(terms);
List<? extends Terms.Bucket> buckets = terms.getBuckets();
assertNotNull(buckets);

View File

@ -466,7 +466,7 @@ public class MaxAggregatorTests extends AggregatorTestCase {
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Global global = (Global) aggregator.buildAggregation(0L);
Global global = (Global) aggregator.buildTopLevel();
assertNotNull(global);
assertEquals("global", global.getName());
assertEquals(10L, global.getDocCount());
@ -732,7 +732,7 @@ public class MaxAggregatorTests extends AggregatorTestCase {
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Global global = (Global) aggregator.buildAggregation(0L);
Global global = (Global) aggregator.buildTopLevel();
assertNotNull(global);
assertEquals("global", global.getName());
assertEquals(0L, global.getDocCount());
@ -775,7 +775,7 @@ public class MaxAggregatorTests extends AggregatorTestCase {
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms terms = (Terms) aggregator.buildAggregation(0L);
Terms terms = (Terms) aggregator.buildTopLevel();
assertNotNull(terms);
List<? extends Terms.Bucket> buckets = terms.getBuckets();
assertNotNull(buckets);
@ -885,17 +885,17 @@ public class MaxAggregatorTests extends AggregatorTestCase {
indexSearcher.search(new MatchAllDocsQuery(), bucketCollector);
bucketCollector.postCollection();
InternalMax max = (InternalMax) maxAggregator.buildAggregation(0L);
InternalMax max = (InternalMax) maxAggregator.buildTopLevel();
assertNotNull(max);
assertEquals(12.0, max.getValue(), 0);
assertEquals("max", max.getName());
InternalValueCount count = (InternalValueCount) countAggregator.buildAggregation(0L);
InternalValueCount count = (InternalValueCount) countAggregator.buildTopLevel();
assertNotNull(count);
assertEquals(20L, count.getValue());
assertEquals("count", count.getName());
Terms terms = (Terms) termsAggregator.buildAggregation(0L);
Terms terms = (Terms) termsAggregator.buildTopLevel();
assertNotNull(terms);
List<? extends Terms.Bucket> buckets = terms.getBuckets();
assertNotNull(buckets);

View File

@ -101,6 +101,7 @@ import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
@ -428,9 +429,9 @@ public abstract class AggregatorTestCase extends ESTestCase {
searcher.search(query, a);
a.postCollection();
@SuppressWarnings("unchecked")
A internalAgg = (A) a.buildAggregation(0L);
InternalAggregationTestCase.assertMultiBucketConsumer(internalAgg, bucketConsumer);
return internalAgg;
A result = (A) a.buildTopLevel();
InternalAggregationTestCase.assertMultiBucketConsumer(result, bucketConsumer);
return result;
}
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSearcher searcher,
@ -498,7 +499,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
a.preCollection();
subSearcher.search(weight, a);
a.postCollection();
InternalAggregation agg = a.buildAggregation(0L);
InternalAggregation agg = a.buildTopLevel();
aggs.add(agg);
InternalAggregationTestCase.assertMultiBucketConsumer(agg, shardBucketConsumer);
}
@ -544,17 +545,17 @@ public abstract class AggregatorTestCase extends ESTestCase {
}
protected <T extends AggregationBuilder, V extends InternalAggregation> void testCase(
T aggregationBuilder,
Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<V> verify,
MappedFieldType... fieldTypes) throws IOException {
T aggregationBuilder,
Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<V> verify,
MappedFieldType... fieldTypes) throws IOException {
try (Directory directory = newDirectory()) {
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
buildIndex.accept(indexWriter);
indexWriter.close();
try (IndexReader indexReader = DirectoryReader.open(directory)) {
try (IndexReader indexReader = wrapDirectoryReader(DirectoryReader.open(directory))) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
V agg = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldTypes);
@ -563,6 +564,14 @@ public abstract class AggregatorTestCase extends ESTestCase {
}
}
/**
* Override to wrap the {@linkplain DirectoryReader} for aggs like
* {@link NestedAggregationBuilder}.
*/
protected IndexReader wrapDirectoryReader(DirectoryReader reader) throws IOException {
return reader;
}
private static class ShardSearcher extends IndexSearcher {
private final List<LeafReaderContext> ctx;
@ -581,7 +590,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
}
}
protected static DirectoryReader wrap(DirectoryReader directoryReader) throws IOException {
protected static DirectoryReader wrapInMockESDirectoryReader(DirectoryReader directoryReader) throws IOException {
return ElasticsearchDirectoryReader.wrap(directoryReader, new ShardId(new Index("_index", "_na_"), 0));
}

View File

@ -155,7 +155,7 @@ public class HDRPreAggregatedPercentilesAggregatorTests extends AggregatorTestCa
aggregator.preCollection();
indexSearcher.search(query, aggregator);
aggregator.postCollection();
verify.accept((InternalHDRPercentiles) aggregator.buildAggregation(0L));
verify.accept((InternalHDRPercentiles) aggregator.buildTopLevel());
}
}

View File

@ -152,7 +152,7 @@ public class TDigestPreAggregatedPercentilesAggregatorTests extends AggregatorTe
aggregator.preCollection();
indexSearcher.search(query, aggregator);
aggregator.postCollection();
verify.accept((InternalTDigestPercentiles) aggregator.buildAggregation(0L));
verify.accept((InternalTDigestPercentiles) aggregator.buildTopLevel());
}
}

View File

@ -256,7 +256,7 @@ public class StringStatsAggregatorTests extends AggregatorTestCase {
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms terms = (Terms) aggregator.buildAggregation(0L);
Terms terms = (Terms) aggregator.buildTopLevel();
assertNotNull(terms);
List<? extends Terms.Bucket> buckets = terms.getBuckets();
assertNotNull(buckets);

View File

@ -1357,7 +1357,7 @@ public class RollupResponseTranslationTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(query, aggregator);
aggregator.postCollection();
return aggregator.buildAggregation(0L);
return aggregator.buildTopLevel();
} finally {
indexReader.close();
directory.close();

View File

@ -114,7 +114,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
CompositeAggregation composite = (CompositeAggregation) aggregator.buildAggregation(0L);
CompositeAggregation composite = (CompositeAggregation) aggregator.buildTopLevel();
indexReader.close();
directory.close();
@ -182,7 +182,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
CompositeAggregation composite = (CompositeAggregation) aggregator.buildAggregation(0L);
CompositeAggregation composite = (CompositeAggregation) aggregator.buildTopLevel();
indexReader.close();
directory.close();
@ -238,7 +238,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
CompositeAggregation composite = (CompositeAggregation) aggregator.buildAggregation(0L);
CompositeAggregation composite = (CompositeAggregation) aggregator.buildTopLevel();
indexReader.close();
directory.close();
@ -305,7 +305,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
CompositeAggregation composite = (CompositeAggregation) aggregator.buildAggregation(0L);
CompositeAggregation composite = (CompositeAggregation) aggregator.buildTopLevel();
indexReader.close();
directory.close();
@ -543,7 +543,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
CompositeAggregation composite = (CompositeAggregation) aggregator.buildAggregation(0L);
CompositeAggregation composite = (CompositeAggregation) aggregator.buildTopLevel();
indexReader.close();
directory.close();
@ -621,7 +621,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
CompositeAggregation composite = (CompositeAggregation) aggregator.buildAggregation(0L);
CompositeAggregation composite = (CompositeAggregation) aggregator.buildTopLevel();
indexReader.close();
directory.close();

View File

@ -308,7 +308,7 @@ public abstract class GeoShapeGeoGridTestCase<T extends InternalGeoGridBucket<T>
indexSearcher.search(query, aggregator);
aggregator.postCollection();
verify.accept((InternalGeoGrid<T>) aggregator.buildAggregation(0L));
verify.accept((InternalGeoGrid<T>) aggregator.buildTopLevel());
indexReader.close();
directory.close();