Increase search.max_buckets to 65,535 (#57042)
Increases the default search.max_buckets limit to 65,535, and only counts buckets during reduce phase. Closes #51731
This commit is contained in:
parent
e0a15e8dc4
commit
8d7f389f3a
|
@ -15,7 +15,7 @@ define fixed number of multiple buckets, and others dynamically create the bucke
|
|||
|
||||
NOTE: The maximum number of buckets allowed in a single response is limited by a
|
||||
dynamic cluster setting named
|
||||
<<search-settings-max-buckets,`search.max_buckets`>>. It defaults to 10,000,
|
||||
<<search-settings-max-buckets,`search.max_buckets`>>. It defaults to 65,535,
|
||||
requests that try to return more than the limit will fail with an exception.
|
||||
|
||||
include::bucket/adjacency-matrix-aggregation.asciidoc[]
|
||||
|
|
|
@ -35,10 +35,10 @@ import java.util.function.IntConsumer;
|
|||
* An aggregation service that creates instances of {@link MultiBucketConsumer}.
|
||||
* The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created
|
||||
* in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
|
||||
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000.
|
||||
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 65535.
|
||||
*/
|
||||
public class MultiBucketConsumerService {
|
||||
public static final int DEFAULT_MAX_BUCKETS = 10000;
|
||||
public static final int DEFAULT_MAX_BUCKETS = 65535;
|
||||
public static final Setting<Integer> MAX_BUCKET_SETTING =
|
||||
Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);
|
||||
|
||||
|
@ -102,6 +102,7 @@ public class MultiBucketConsumerService {
|
|||
|
||||
// aggregations execute in a single thread so no atomic here
|
||||
private int count;
|
||||
private int callCount = 0;
|
||||
|
||||
public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
|
||||
this.limit = limit;
|
||||
|
@ -110,15 +111,17 @@ public class MultiBucketConsumerService {
|
|||
|
||||
@Override
|
||||
public void accept(int value) {
|
||||
if (value != 0) {
|
||||
count += value;
|
||||
if (count > limit) {
|
||||
throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit
|
||||
+ "] but was [" + count + "]. This limit can be set by changing the [" +
|
||||
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
|
||||
}
|
||||
|
||||
// check parent circuit breaker every 1024 buckets
|
||||
if (value > 0 && (count & 0x3FF) == 0) {
|
||||
}
|
||||
// check parent circuit breaker every 1024 calls
|
||||
callCount++;
|
||||
if ((callCount & 0x3FF) == 0) {
|
||||
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,12 +57,12 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|||
Map<String, Object> metadata) throws IOException {
|
||||
super(name, factories, context, parent, metadata);
|
||||
bigArrays = context.bigArrays();
|
||||
docCounts = bigArrays.newIntArray(1, true);
|
||||
if (context.aggregations() != null) {
|
||||
multiBucketConsumer = context.aggregations().multiBucketConsumer();
|
||||
} else {
|
||||
multiBucketConsumer = (count) -> {};
|
||||
}
|
||||
docCounts = bigArrays.newIntArray(1, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -91,7 +91,12 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|||
* Same as {@link #collectBucket(LeafBucketCollector, int, long)}, but doesn't check if the docCounts needs to be re-sized.
|
||||
*/
|
||||
public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException {
|
||||
docCounts.increment(bucketOrd, 1);
|
||||
if (docCounts.increment(bucketOrd, 1) == 1) {
|
||||
// We calculate the final number of buckets only during the reduce phase. But we still need to
|
||||
// trigger bucket consumer from time to time in order to give it a chance to check available memory and break
|
||||
// the execution if we are running out. To achieve that we are passing 0 as a bucket count.
|
||||
multiBucketConsumer.accept(0);
|
||||
}
|
||||
subCollector.collect(doc, bucketOrd);
|
||||
}
|
||||
|
||||
|
@ -137,14 +142,6 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds {@code count} buckets to the global count for the request and fails if this number is greater than
|
||||
* the maximum number of buckets allowed in a response
|
||||
*/
|
||||
protected final void consumeBucketsAndMaybeBreak(int count) {
|
||||
multiBucketConsumer.accept(count);
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook to allow taking an action before building buckets.
|
||||
*/
|
||||
|
@ -267,7 +264,6 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|||
protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(long[] owningBucketOrds, int bucketsPerOwningBucketOrd,
|
||||
BucketBuilderForFixedCount<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
|
||||
int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd;
|
||||
consumeBucketsAndMaybeBreak(totalBuckets);
|
||||
long[] bucketOrdsToCollect = new long[totalBuckets];
|
||||
int bucketOrdIdx = 0;
|
||||
for (long owningBucketOrd : owningBucketOrds) {
|
||||
|
@ -328,7 +324,6 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|||
protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongHash bucketOrds,
|
||||
BucketBuilderForVariable<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
|
||||
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
|
||||
consumeBucketsAndMaybeBreak((int) bucketOrds.size());
|
||||
long[] bucketOrdsToCollect = new long[(int) bucketOrds.size()];
|
||||
for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) {
|
||||
bucketOrdsToCollect[bucketOrd] = bucketOrd;
|
||||
|
@ -360,7 +355,6 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|||
throw new AggregationExecutionException("Can't collect more than [" + Integer.MAX_VALUE
|
||||
+ "] buckets but attempted [" + totalOrdsToCollect + "]");
|
||||
}
|
||||
consumeBucketsAndMaybeBreak((int) totalOrdsToCollect);
|
||||
long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
|
||||
int b = 0;
|
||||
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
|
||||
|
|
|
@ -184,7 +184,6 @@ public class AdjacencyMatrixAggregator extends BucketsAggregator {
|
|||
totalBucketsToBuild++;
|
||||
}
|
||||
}
|
||||
consumeBucketsAndMaybeBreak(totalBucketsToBuild);
|
||||
long[] bucketOrdsToBuild = new long[totalBucketsToBuild];
|
||||
int builtBucketIndex = 0;
|
||||
for (int ord = 0; ord < maxOrd; ord++) {
|
||||
|
|
|
@ -196,12 +196,10 @@ public class InternalAdjacencyMatrix
|
|||
for (List<InternalBucket> sameRangeList : bucketsMap.values()) {
|
||||
InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext);
|
||||
if(reducedBucket.docCount >= 1){
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
reducedBuckets.add(reducedBucket);
|
||||
} else {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reducedBucket));
|
||||
}
|
||||
}
|
||||
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
|
||||
Collections.sort(reducedBuckets, Comparator.comparing(InternalBucket::getKey));
|
||||
|
||||
InternalAdjacencyMatrix reduced = new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata());
|
||||
|
|
|
@ -136,7 +136,6 @@ final class CompositeAggregator extends BucketsAggregator {
|
|||
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
|
||||
// Composite aggregator must be at the top of the aggregation tree
|
||||
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
|
||||
consumeBucketsAndMaybeBreak(queue.size());
|
||||
if (deferredCollectors != NO_OP_COLLECTOR) {
|
||||
// Replay all documents that contain at least one top bucket (collected during the first pass).
|
||||
runDeferredCollections();
|
||||
|
|
|
@ -193,7 +193,6 @@ public class InternalComposite
|
|||
if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
|
||||
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
|
||||
buckets.clear();
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
result.add(reduceBucket);
|
||||
if (result.size() >= size) {
|
||||
break;
|
||||
|
@ -207,7 +206,6 @@ public class InternalComposite
|
|||
}
|
||||
if (buckets.size() > 0) {
|
||||
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
result.add(reduceBucket);
|
||||
}
|
||||
|
||||
|
@ -220,6 +218,7 @@ public class InternalComposite
|
|||
reducedFormats = lastBucket.formats;
|
||||
lastKey = lastBucket.getRawKey();
|
||||
}
|
||||
reduceContext.consumeBucketsAndMaybeBreak(result.size());
|
||||
return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls,
|
||||
earlyTerminated, metadata);
|
||||
}
|
||||
|
|
|
@ -109,7 +109,6 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
|
|||
InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][];
|
||||
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
|
||||
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize);
|
||||
consumeBucketsAndMaybeBreak(size);
|
||||
|
||||
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
|
||||
InternalGeoGridBucket spare = null;
|
||||
|
|
|
@ -100,18 +100,14 @@ public abstract class InternalGeoGrid<B extends InternalGeoGridBucket>
|
|||
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
|
||||
for (LongObjectPagedHashMap.Cursor<List<InternalGeoGridBucket>> cursor : buckets) {
|
||||
List<InternalGeoGridBucket> sameCellBuckets = cursor.value;
|
||||
InternalGeoGridBucket removed = ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
|
||||
if (removed != null) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
|
||||
} else {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
}
|
||||
ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
|
||||
}
|
||||
buckets.close();
|
||||
InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
|
||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||
list[i] = ordered.pop();
|
||||
}
|
||||
reduceContext.consumeBucketsAndMaybeBreak(list.length);
|
||||
return create(getName(), requiredSize, Arrays.asList(list), getMetadata());
|
||||
}
|
||||
|
||||
|
|
|
@ -328,7 +328,6 @@ public final class InternalAutoDateHistogram extends
|
|||
if (reduceRounding.round(top.current.key) != key) {
|
||||
// the key changes, reduce what we already buffered and reset the buffer for current buckets
|
||||
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
reducedBuckets.add(reduced);
|
||||
currentBuckets.clear();
|
||||
key = reduceRounding.round(top.current.key);
|
||||
|
@ -348,7 +347,6 @@ public final class InternalAutoDateHistogram extends
|
|||
|
||||
if (currentBuckets.isEmpty() == false) {
|
||||
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
reducedBuckets.add(reduced);
|
||||
}
|
||||
}
|
||||
|
@ -376,22 +374,17 @@ public final class InternalAutoDateHistogram extends
|
|||
long roundedBucketKey = reduceRounding.round(bucket.key);
|
||||
if (Double.isNaN(key)) {
|
||||
key = roundedBucketKey;
|
||||
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
|
||||
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
|
||||
} else if (roundedBucketKey == key) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
|
||||
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
|
||||
} else {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
|
||||
sameKeyedBuckets.clear();
|
||||
key = roundedBucketKey;
|
||||
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
|
||||
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
|
||||
}
|
||||
}
|
||||
if (sameKeyedBuckets.isEmpty() == false) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
|
||||
}
|
||||
reducedBuckets = mergedBuckets;
|
||||
|
@ -449,7 +442,6 @@ public final class InternalAutoDateHistogram extends
|
|||
if (lastBucket != null) {
|
||||
long key = rounding.nextRoundingValue(lastBucket.key);
|
||||
while (key < nextBucket.key) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
iter.add(new InternalAutoDateHistogram.Bucket(key, 0, format, reducedEmptySubAggs));
|
||||
key = rounding.nextRoundingValue(key);
|
||||
}
|
||||
|
@ -515,7 +507,7 @@ public final class InternalAutoDateHistogram extends
|
|||
// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
|
||||
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
|
||||
}
|
||||
|
||||
reduceContext.consumeBucketsAndMaybeBreak(reducedBucketsResult.buckets.size());
|
||||
BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx,
|
||||
this.bucketInfo.emptySubAggregations);
|
||||
|
||||
|
@ -551,16 +543,13 @@ public final class InternalAutoDateHistogram extends
|
|||
for (int i = 0; i < reducedBuckets.size(); i++) {
|
||||
Bucket bucket = reducedBuckets.get(i);
|
||||
if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
|
||||
sameKeyedBuckets.clear();
|
||||
key = roundingInfo.rounding.round(bucket.key);
|
||||
}
|
||||
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
|
||||
sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations));
|
||||
}
|
||||
if (sameKeyedBuckets.isEmpty() == false) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
|
||||
}
|
||||
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);
|
||||
|
|
|
@ -329,10 +329,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
// the key changes, reduce what we already buffered and reset the buffer for current buckets
|
||||
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
|
||||
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
reducedBuckets.add(reduced);
|
||||
} else {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
|
||||
}
|
||||
currentBuckets.clear();
|
||||
key = top.current.key;
|
||||
|
@ -353,10 +350,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
if (currentBuckets.isEmpty() == false) {
|
||||
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
|
||||
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
reducedBuckets.add(reduced);
|
||||
} else {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -396,7 +390,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
long key = bounds.getMin() + offset;
|
||||
long max = bounds.getMax() + offset;
|
||||
while (key <= max) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
||||
key = nextKey(key).longValue();
|
||||
}
|
||||
|
@ -406,7 +399,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
long key = bounds.getMin() + offset;
|
||||
if (key < firstBucket.key) {
|
||||
while (key < firstBucket.key) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
||||
key = nextKey(key).longValue();
|
||||
}
|
||||
|
@ -422,7 +414,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
if (lastBucket != null) {
|
||||
long key = nextKey(lastBucket.key).longValue();
|
||||
while (key < nextBucket.key) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
||||
key = nextKey(key).longValue();
|
||||
}
|
||||
|
@ -436,7 +427,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
long key = nextKey(lastBucket.key).longValue();
|
||||
long max = bounds.getMax() + offset;
|
||||
while (key <= max) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
||||
key = nextKey(key).longValue();
|
||||
}
|
||||
|
@ -462,6 +452,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
CollectionUtil.introSort(reducedBuckets, order.comparator());
|
||||
}
|
||||
}
|
||||
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
|
||||
return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo,
|
||||
format, keyed, getMetadata());
|
||||
}
|
||||
|
|
|
@ -313,10 +313,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
|
|||
// Using Double.compare instead of != to handle NaN correctly.
|
||||
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
|
||||
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
reducedBuckets.add(reduced);
|
||||
} else {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
|
||||
}
|
||||
currentBuckets.clear();
|
||||
key = top.current.key;
|
||||
|
@ -337,10 +334,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
|
|||
if (currentBuckets.isEmpty() == false) {
|
||||
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
|
||||
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
reducedBuckets.add(reduced);
|
||||
} else {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -380,7 +374,6 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
|
|||
if (iter.hasNext() == false) {
|
||||
// fill with empty buckets
|
||||
for (double key = round(emptyBucketInfo.minBound); key <= emptyBucketInfo.maxBound; key = nextKey(key)) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
||||
}
|
||||
} else {
|
||||
|
@ -388,7 +381,6 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
|
|||
if (Double.isFinite(emptyBucketInfo.minBound)) {
|
||||
// fill with empty buckets until the first key
|
||||
for (double key = round(emptyBucketInfo.minBound); key < first.key; key = nextKey(key)) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
||||
}
|
||||
}
|
||||
|
@ -401,7 +393,6 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
|
|||
if (lastBucket != null) {
|
||||
double key = nextKey(lastBucket.key);
|
||||
while (key < nextBucket.key) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
||||
key = nextKey(key);
|
||||
}
|
||||
|
@ -412,7 +403,6 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
|
|||
|
||||
// finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user)
|
||||
for (double key = nextKey(lastBucket.key); key <= emptyBucketInfo.maxBound; key = nextKey(key)) {
|
||||
reduceContext.consumeBucketsAndMaybeBreak(1);
|
||||
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
||||
}
|
||||
}
|
||||
|
@ -437,6 +427,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
|
|||
CollectionUtil.introSort(reducedBuckets, order.comparator());
|
||||
}
|
||||
}
|
||||
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
|
||||
return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata());
|
||||
}
|
||||
|
||||
|
|
|
@ -543,9 +543,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
|||
}
|
||||
updateBucket(spare, globalOrd, bucketOrd, docCount);
|
||||
spare = ordered.insertWithOverflow(spare);
|
||||
if (spare == null) {
|
||||
consumeBucketsAndMaybeBreak(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -113,8 +113,6 @@ public class LongRareTermsAggregator extends AbstractRareTermsAggregator<ValuesS
|
|||
LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(oldKey, docCount, null, format);
|
||||
bucket.bucketOrd = newBucketOrd;
|
||||
buckets.add(bucket);
|
||||
|
||||
consumeBucketsAndMaybeBreak(1);
|
||||
} else {
|
||||
// Make a note when one of the ords has been deleted
|
||||
deletionCount += 1;
|
||||
|
|
|
@ -172,9 +172,6 @@ public class NumericTermsAggregator extends TermsAggregator {
|
|||
if (bucketCountThresholds.getShardMinDocCount() <= docCount) {
|
||||
updateBucket(spare, ordsEnum, docCount);
|
||||
spare = ordered.insertWithOverflow(spare);
|
||||
if (spare == null) {
|
||||
consumeBucketsAndMaybeBreak(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -104,9 +104,6 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
|
|||
|
||||
spare.bucketOrd = i;
|
||||
spare = ordered.insertWithOverflow(spare);
|
||||
if (spare == null) {
|
||||
consumeBucketsAndMaybeBreak(1);
|
||||
}
|
||||
}
|
||||
|
||||
final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
|
||||
|
|
|
@ -216,9 +216,6 @@ public class SignificantTextAggregator extends BucketsAggregator {
|
|||
|
||||
spare.bucketOrd = i;
|
||||
spare = ordered.insertWithOverflow(spare);
|
||||
if (spare == null) {
|
||||
consumeBucketsAndMaybeBreak(1);
|
||||
}
|
||||
}
|
||||
|
||||
final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
|
||||
|
|
|
@ -119,8 +119,6 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator<Value
|
|||
StringRareTerms.Bucket bucket = new StringRareTerms.Bucket(BytesRef.deepCopyOf(oldKey), docCount, null, format);
|
||||
bucket.bucketOrd = newBucketOrd;
|
||||
buckets.add(bucket);
|
||||
|
||||
consumeBucketsAndMaybeBreak(1);
|
||||
} else {
|
||||
// Make a note when one of the ords has been deleted
|
||||
deletionCount += 1;
|
||||
|
|
|
@ -147,9 +147,6 @@ public class StringTermsAggregator extends AbstractStringTermsAggregator {
|
|||
spare.bucketOrd = i;
|
||||
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
|
||||
spare = ordered.insertWithOverflow(spare);
|
||||
if (spare == null) {
|
||||
consumeBucketsAndMaybeBreak(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -323,7 +323,10 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
|
|||
|
||||
// make sure used bytes is greater than the total circuit breaker limit
|
||||
breaker.addWithoutBreaking(200);
|
||||
|
||||
// make sure that we check on the the following call
|
||||
for (int i = 0; i < 1023; i++) {
|
||||
multiBucketConsumer.accept(0);
|
||||
}
|
||||
CircuitBreakingException exception =
|
||||
expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(1024));
|
||||
assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [allocated_buckets] would be"));
|
||||
|
|
|
@ -36,8 +36,10 @@ import org.elasticsearch.common.geo.GeoBoundingBoxTests;
|
|||
import org.elasticsearch.common.geo.GeoUtils;
|
||||
import org.elasticsearch.index.mapper.GeoPointFieldMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
|
||||
|
@ -291,4 +293,11 @@ public abstract class GeoGridAggregatorTestCase<T extends InternalGeoGridBucket>
|
|||
indexReader.close();
|
||||
directory.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
|
||||
/*
|
||||
* No-op.
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.index.IndexSettings;
|
|||
import org.elasticsearch.index.mapper.DateFieldMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.NumberFieldMapper;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
|
||||
|
@ -869,4 +870,11 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
|||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
|
||||
/*
|
||||
* No-op.
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,10 +38,8 @@ import org.elasticsearch.common.time.DateFormatters;
|
|||
import org.elasticsearch.index.mapper.DateFieldMapper;
|
||||
import org.elasticsearch.index.mapper.KeywordFieldMapper;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||
import org.elasticsearch.search.aggregations.BucketOrder;
|
||||
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.TooManyBucketsException;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
|
||||
|
@ -967,71 +965,6 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
|||
);
|
||||
}
|
||||
|
||||
public void testMaxBucket() throws IOException {
|
||||
Query query = new MatchAllDocsQuery();
|
||||
List<String> timestamps = Arrays.asList(
|
||||
"2010-01-01T00:00:00.000Z",
|
||||
"2011-01-01T00:00:00.000Z",
|
||||
"2017-01-01T00:00:00.000Z"
|
||||
);
|
||||
|
||||
expectThrows(TooManyBucketsException.class, () -> testSearchCase(query, timestamps,
|
||||
aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE),
|
||||
histogram -> {}, 2, false));
|
||||
|
||||
expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
|
||||
aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE),
|
||||
histogram -> {}, 2, false));
|
||||
|
||||
expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
|
||||
aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE).minDocCount(0L),
|
||||
histogram -> {}, 100, false));
|
||||
|
||||
expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
|
||||
aggregation ->
|
||||
aggregation.fixedInterval(DateHistogramInterval.seconds(5))
|
||||
.field(AGGREGABLE_DATE)
|
||||
.subAggregation(
|
||||
AggregationBuilders.dateHistogram("1")
|
||||
.fixedInterval(DateHistogramInterval.seconds(5))
|
||||
.field(AGGREGABLE_DATE)
|
||||
),
|
||||
histogram -> {}, 5, false));
|
||||
}
|
||||
|
||||
public void testMaxBucketDeprecated() throws IOException {
|
||||
Query query = new MatchAllDocsQuery();
|
||||
List<String> timestamps = Arrays.asList(
|
||||
"2010-01-01T00:00:00.000Z",
|
||||
"2011-01-01T00:00:00.000Z",
|
||||
"2017-01-01T00:00:00.000Z"
|
||||
);
|
||||
|
||||
expectThrows(TooManyBucketsException.class, () -> testSearchCase(query, timestamps,
|
||||
aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE),
|
||||
histogram -> {}, 2, false));
|
||||
|
||||
expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
|
||||
aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE),
|
||||
histogram -> {}, 2, false));
|
||||
|
||||
expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
|
||||
aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE).minDocCount(0L),
|
||||
histogram -> {}, 100, false));
|
||||
|
||||
expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
|
||||
aggregation ->
|
||||
aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5))
|
||||
.field(AGGREGABLE_DATE)
|
||||
.subAggregation(
|
||||
AggregationBuilders.dateHistogram("1")
|
||||
.dateHistogramInterval(DateHistogramInterval.seconds(5))
|
||||
.field(AGGREGABLE_DATE)
|
||||
),
|
||||
histogram -> {}, 5, false));
|
||||
assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future.");
|
||||
}
|
||||
|
||||
public void testFixedWithCalendar() throws IOException {
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testSearchCase(new MatchAllDocsQuery(),
|
||||
Arrays.asList(
|
||||
|
|
|
@ -36,7 +36,9 @@ import org.elasticsearch.index.mapper.DateFieldMapper;
|
|||
import org.elasticsearch.index.mapper.KeywordFieldMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.NumberFieldMapper;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -304,4 +306,11 @@ public class DateRangeAggregatorTests extends AggregatorTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
|
||||
/*
|
||||
* No-op.
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
|
|
@ -427,7 +427,6 @@ public abstract class AggregatorTestCase extends ESTestCase {
|
|||
a.postCollection();
|
||||
@SuppressWarnings("unchecked")
|
||||
A result = (A) a.buildTopLevel();
|
||||
InternalAggregationTestCase.assertMultiBucketConsumer(result, bucketConsumer);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -498,7 +497,6 @@ public abstract class AggregatorTestCase extends ESTestCase {
|
|||
a.postCollection();
|
||||
InternalAggregation agg = a.buildTopLevel();
|
||||
aggs.add(agg);
|
||||
InternalAggregationTestCase.assertMultiBucketConsumer(agg, shardBucketConsumer);
|
||||
}
|
||||
if (aggs.isEmpty()) {
|
||||
return (A) root.buildEmptyAggregation();
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.elasticsearch.search.SearchModule;
|
|||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
|
||||
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
|
||||
import org.elasticsearch.search.aggregations.ParsedAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.adjacency.AdjacencyMatrixAggregationBuilder;
|
||||
|
@ -82,15 +83,15 @@ import org.elasticsearch.search.aggregations.bucket.range.ParsedRange;
|
|||
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler;
|
||||
import org.elasticsearch.search.aggregations.bucket.sampler.ParsedSampler;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantLongTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantStringTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.SignificantLongTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.SignificantStringTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantLongTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantStringTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.SignificantLongTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.SignificantStringTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
|
||||
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
|
||||
|
@ -392,10 +393,15 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
|
|||
bigArrays, mockScriptService, bucketConsumer, PipelineTree.EMPTY);
|
||||
@SuppressWarnings("unchecked")
|
||||
T reduced = (T) inputs.get(0).reduce(toReduce, context);
|
||||
assertMultiBucketConsumer(reduced, bucketConsumer);
|
||||
doAssertReducedMultiBucketConsumer(reduced, bucketConsumer);
|
||||
assertReduced(reduced, inputs);
|
||||
}
|
||||
|
||||
protected void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
|
||||
InternalAggregationTestCase.assertMultiBucketConsumer(agg, bucketConsumer);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* overwrite in tests that need it
|
||||
*/
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.search.aggregations.Aggregations;
|
|||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
|
||||
import org.elasticsearch.search.aggregations.ParsedAggregation;
|
||||
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
|
||||
|
@ -184,4 +185,11 @@ public abstract class InternalMultiBucketAggregationTestCase<T extends InternalA
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
|
||||
/*
|
||||
* No-op.
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
|
|
@ -135,9 +135,9 @@ public abstract class JdbcErrorsTestCase extends JdbcIntegrationTestCase {
|
|||
try (Connection c = esJdbc()) {
|
||||
SQLException e = expectThrows(
|
||||
SQLException.class,
|
||||
() -> c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000").executeQuery()
|
||||
() -> c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000").executeQuery()
|
||||
);
|
||||
assertEquals("The maximum LIMIT for aggregate sorting is [10000], received [12000]", e.getMessage());
|
||||
assertEquals("The maximum LIMIT for aggregate sorting is [65535], received [120000]", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,9 +117,9 @@ public abstract class ErrorsTestCase extends CliIntegrationTestCase implements o
|
|||
@Override
|
||||
public void testHardLimitForSortOnAggregate() throws Exception {
|
||||
index("test", body -> body.field("a", 1).field("b", 2));
|
||||
String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000");
|
||||
String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000");
|
||||
assertEquals(
|
||||
START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [10000], received [12000]" + END,
|
||||
START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [65535], received [120000]" + END,
|
||||
commandResult
|
||||
);
|
||||
}
|
||||
|
|
|
@ -439,8 +439,8 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
|
|||
public void testHardLimitForSortOnAggregate() throws Exception {
|
||||
index("{\"a\": 1, \"b\": 2}");
|
||||
expectBadRequest(
|
||||
() -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000"),
|
||||
containsString("The maximum LIMIT for aggregate sorting is [10000], received [12000]")
|
||||
() -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000"),
|
||||
containsString("The maximum LIMIT for aggregate sorting is [65535], received [120000]")
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue