diff --git a/docs/reference/aggregations/bucket.asciidoc b/docs/reference/aggregations/bucket.asciidoc index 30cb7d604ff..134c6a848a5 100644 --- a/docs/reference/aggregations/bucket.asciidoc +++ b/docs/reference/aggregations/bucket.asciidoc @@ -15,7 +15,7 @@ define fixed number of multiple buckets, and others dynamically create the bucke NOTE: The maximum number of buckets allowed in a single response is limited by a dynamic cluster setting named -<>. It defaults to 10,000, +<>. It defaults to 65,535, requests that try to return more than the limit will fail with an exception. include::bucket/adjacency-matrix-aggregation.asciidoc[] diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java index a3a7e52167a..9cbc4eeab9c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java @@ -35,10 +35,10 @@ import java.util.function.IntConsumer; * An aggregation service that creates instances of {@link MultiBucketConsumer}. * The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created * in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}. - * The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000. + * The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 65535. */ public class MultiBucketConsumerService { - public static final int DEFAULT_MAX_BUCKETS = 10000; + public static final int DEFAULT_MAX_BUCKETS = 65535; public static final Setting MAX_BUCKET_SETTING = Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic); @@ -102,6 +102,7 @@ public class MultiBucketConsumerService { // aggregations execute in a single thread so no atomic here private int count; + private int callCount = 0; public MultiBucketConsumer(int limit, CircuitBreaker breaker) { this.limit = limit; @@ -110,15 +111,17 @@ public class MultiBucketConsumerService { @Override public void accept(int value) { - count += value; - if (count > limit) { - throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit - + "] but was [" + count + "]. This limit can be set by changing the [" + - MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit); + if (value != 0) { + count += value; + if (count > limit) { + throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit + + "] but was [" + count + "]. This limit can be set by changing the [" + + MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit); + } } - - // check parent circuit breaker every 1024 buckets - if (value > 0 && (count & 0x3FF) == 0) { + // check parent circuit breaker every 1024 calls + callCount++; + if ((callCount & 0x3FF) == 0) { breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index dca1c94feab..2f088b98e1d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -57,12 +57,12 @@ public abstract class BucketsAggregator extends AggregatorBase { Map metadata) throws IOException { super(name, factories, context, parent, metadata); bigArrays = context.bigArrays(); - docCounts = bigArrays.newIntArray(1, true); if (context.aggregations() != null) { multiBucketConsumer = context.aggregations().multiBucketConsumer(); } else { multiBucketConsumer = (count) -> {}; } + docCounts = bigArrays.newIntArray(1, true); } /** @@ -91,7 +91,12 @@ public abstract class BucketsAggregator extends AggregatorBase { * Same as {@link #collectBucket(LeafBucketCollector, int, long)}, but doesn't check if the docCounts needs to be re-sized. */ public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException { - docCounts.increment(bucketOrd, 1); + if (docCounts.increment(bucketOrd, 1) == 1) { + // We calculate the final number of buckets only during the reduce phase. But we still need to + // trigger bucket consumer from time to time in order to give it a chance to check available memory and break + // the execution if we are running out. To achieve that we are passing 0 as a bucket count. + multiBucketConsumer.accept(0); + } subCollector.collect(doc, bucketOrd); } @@ -137,14 +142,6 @@ public abstract class BucketsAggregator extends AggregatorBase { } } - /** - * Adds {@code count} buckets to the global count for the request and fails if this number is greater than - * the maximum number of buckets allowed in a response - */ - protected final void consumeBucketsAndMaybeBreak(int count) { - multiBucketConsumer.accept(count); - } - /** * Hook to allow taking an action before building buckets. */ @@ -186,7 +183,7 @@ public abstract class BucketsAggregator extends AggregatorBase { public int size() { return aggregations.length; } - }); + }); } return result; } @@ -267,7 +264,6 @@ public abstract class BucketsAggregator extends AggregatorBase { protected final InternalAggregation[] buildAggregationsForFixedBucketCount(long[] owningBucketOrds, int bucketsPerOwningBucketOrd, BucketBuilderForFixedCount bucketBuilder, Function, InternalAggregation> resultBuilder) throws IOException { int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd; - consumeBucketsAndMaybeBreak(totalBuckets); long[] bucketOrdsToCollect = new long[totalBuckets]; int bucketOrdIdx = 0; for (long owningBucketOrd : owningBucketOrds) { @@ -299,7 +295,7 @@ public abstract class BucketsAggregator extends AggregatorBase { * @param owningBucketOrds owning bucket ordinals for which to build the results * @param resultBuilder how to build a result from the sub aggregation results */ - protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds, + protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds, SingleBucketResultBuilder resultBuilder) throws IOException { /* * It'd be entirely reasonable to call @@ -328,7 +324,6 @@ public abstract class BucketsAggregator extends AggregatorBase { protected final InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongHash bucketOrds, BucketBuilderForVariable bucketBuilder, Function, InternalAggregation> resultBuilder) throws IOException { assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; - consumeBucketsAndMaybeBreak((int) bucketOrds.size()); long[] bucketOrdsToCollect = new long[(int) bucketOrds.size()]; for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) { bucketOrdsToCollect[bucketOrd] = bucketOrd; @@ -360,7 +355,6 @@ public abstract class BucketsAggregator extends AggregatorBase { throw new AggregationExecutionException("Can't collect more than [" + Integer.MAX_VALUE + "] buckets but attempted [" + totalOrdsToCollect + "]"); } - consumeBucketsAndMaybeBreak((int) totalOrdsToCollect); long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect]; int b = 0; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java index fed6916b2fc..993085cba57 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java @@ -184,7 +184,6 @@ public class AdjacencyMatrixAggregator extends BucketsAggregator { totalBucketsToBuild++; } } - consumeBucketsAndMaybeBreak(totalBucketsToBuild); long[] bucketOrdsToBuild = new long[totalBucketsToBuild]; int builtBucketIndex = 0; for (int ord = 0; ord < maxOrd; ord++) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java index deba1a10c1e..6a9d51c3ef3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java @@ -196,12 +196,10 @@ public class InternalAdjacencyMatrix for (List sameRangeList : bucketsMap.values()) { InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext); if(reducedBucket.docCount >= 1){ - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reducedBucket); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reducedBucket)); } } + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); Collections.sort(reducedBuckets, Comparator.comparing(InternalBucket::getKey)); InternalAdjacencyMatrix reduced = new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java index ed8f2d20427..8cee8087a07 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -136,7 +136,6 @@ final class CompositeAggregator extends BucketsAggregator { public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { // Composite aggregator must be at the top of the aggregation tree assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L; - consumeBucketsAndMaybeBreak(queue.size()); if (deferredCollectors != NO_OP_COLLECTOR) { // Replay all documents that contain at least one top bucket (collected during the first pass). runDeferredCollections(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java index 3e0f8de5e67..db2dc5e34d7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java @@ -193,7 +193,6 @@ public class InternalComposite if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) { InternalBucket reduceBucket = reduceBucket(buckets, reduceContext); buckets.clear(); - reduceContext.consumeBucketsAndMaybeBreak(1); result.add(reduceBucket); if (result.size() >= size) { break; @@ -207,7 +206,6 @@ public class InternalComposite } if (buckets.size() > 0) { InternalBucket reduceBucket = reduceBucket(buckets, reduceContext); - reduceContext.consumeBucketsAndMaybeBreak(1); result.add(reduceBucket); } @@ -220,6 +218,7 @@ public class InternalComposite reducedFormats = lastBucket.formats; lastKey = lastBucket.getRawKey(); } + reduceContext.consumeBucketsAndMaybeBreak(result.size()); return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls, earlyTerminated, metadata); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java index f44ec9263ad..16ebd6a7db3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java @@ -109,8 +109,7 @@ public abstract class GeoGridAggregator extends Bucke InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize); - consumeBucketsAndMaybeBreak(size); - + BucketPriorityQueue ordered = new BucketPriorityQueue<>(size); InternalGeoGridBucket spare = null; LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); @@ -118,7 +117,7 @@ public abstract class GeoGridAggregator extends Bucke if (spare == null) { spare = newEmptyBucket(); } - + // need a special function to keep the source bucket // up-to-date so it can get the appropriate key spare.hashAsLong = ordsEnum.value(); @@ -126,7 +125,7 @@ public abstract class GeoGridAggregator extends Bucke spare.bucketOrd = ordsEnum.ord(); spare = ordered.insertWithOverflow(spare); } - + topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[ordered.size()]; for (int i = ordered.size() - 1; i >= 0; --i) { topBucketsPerOrd[ordIdx][i] = ordered.pop(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java index c967787ed67..a1bd27d1f29 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java @@ -100,18 +100,14 @@ public abstract class InternalGeoGrid BucketPriorityQueue ordered = new BucketPriorityQueue<>(size); for (LongObjectPagedHashMap.Cursor> cursor : buckets) { List sameCellBuckets = cursor.value; - InternalGeoGridBucket removed = ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext)); - if (removed != null) { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); - } else { - reduceContext.consumeBucketsAndMaybeBreak(1); - } + ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext)); } buckets.close(); InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()]; for (int i = ordered.size() - 1; i >= 0; i--) { list[i] = ordered.pop(); } + reduceContext.consumeBucketsAndMaybeBreak(list.length); return create(getName(), requiredSize, Arrays.asList(list), getMetadata()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java index 7465ae27de7..630e4ced5f8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -328,7 +328,6 @@ public final class InternalAutoDateHistogram extends if (reduceRounding.round(top.current.key) != key) { // the key changes, reduce what we already buffered and reset the buffer for current buckets final Bucket reduced = reduceBucket(currentBuckets, reduceContext); - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); currentBuckets.clear(); key = reduceRounding.round(top.current.key); @@ -348,7 +347,6 @@ public final class InternalAutoDateHistogram extends if (currentBuckets.isEmpty() == false) { final Bucket reduced = reduceBucket(currentBuckets, reduceContext); - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); } } @@ -376,22 +374,17 @@ public final class InternalAutoDateHistogram extends long roundedBucketKey = reduceRounding.round(bucket.key); if (Double.isNaN(key)) { key = roundedBucketKey; - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); } else if (roundedBucketKey == key) { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); } else { - reduceContext.consumeBucketsAndMaybeBreak(1); mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); sameKeyedBuckets.clear(); key = roundedBucketKey; - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); } } if (sameKeyedBuckets.isEmpty() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); } reducedBuckets = mergedBuckets; @@ -449,7 +442,6 @@ public final class InternalAutoDateHistogram extends if (lastBucket != null) { long key = rounding.nextRoundingValue(lastBucket.key); while (key < nextBucket.key) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalAutoDateHistogram.Bucket(key, 0, format, reducedEmptySubAggs)); key = rounding.nextRoundingValue(key); } @@ -515,7 +507,7 @@ public final class InternalAutoDateHistogram extends // Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext); } - + reduceContext.consumeBucketsAndMaybeBreak(reducedBucketsResult.buckets.size()); BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx, this.bucketInfo.emptySubAggregations); @@ -551,16 +543,13 @@ public final class InternalAutoDateHistogram extends for (int i = 0; i < reducedBuckets.size(); i++) { Bucket bucket = reducedBuckets.get(i); if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); sameKeyedBuckets.clear(); key = roundingInfo.rounding.round(bucket.key); } - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations)); } if (sameKeyedBuckets.isEmpty() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); } return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 0965dedc047..d40b71bb2bc 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -329,10 +329,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation< // the key changes, reduce what we already buffered and reset the buffer for current buckets final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced)); } currentBuckets.clear(); key = top.current.key; @@ -353,10 +350,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation< if (currentBuckets.isEmpty() == false) { final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced)); } } } @@ -396,7 +390,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation< long key = bounds.getMin() + offset; long max = bounds.getMax() + offset; while (key <= max) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } @@ -406,7 +399,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation< long key = bounds.getMin() + offset; if (key < firstBucket.key) { while (key < firstBucket.key) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } @@ -422,7 +414,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation< if (lastBucket != null) { long key = nextKey(lastBucket.key).longValue(); while (key < nextBucket.key) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } @@ -436,7 +427,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation< long key = nextKey(lastBucket.key).longValue(); long max = bounds.getMax() + offset; while (key <= max) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } @@ -462,6 +452,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation< CollectionUtil.introSort(reducedBuckets, order.comparator()); } } + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo, format, keyed, getMetadata()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index 318ffec596b..ac31c12c994 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -313,10 +313,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation= minDocCount || reduceContext.isFinalReduce() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced)); } currentBuckets.clear(); key = top.current.key; @@ -337,10 +334,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation= minDocCount || reduceContext.isFinalReduce() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced)); } } } @@ -380,7 +374,6 @@ public final class InternalHistogram extends InternalMultiBucketAggregation buildPriorityQueue(int size) { return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java index fde74735c9e..52327d02ad9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java @@ -113,8 +113,6 @@ public class LongRareTermsAggregator extends AbstractRareTermsAggregator multiBucketConsumer.accept(1024)); assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [allocated_buckets] would be")); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java index 48a953818c1..d31bf4279e4 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java @@ -36,8 +36,10 @@ import org.elasticsearch.common.geo.GeoBoundingBoxTests; import org.elasticsearch.common.geo.GeoUtils; import org.elasticsearch.index.mapper.GeoPointFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; @@ -291,4 +293,11 @@ public abstract class GeoGridAggregatorTestCase indexReader.close(); directory.close(); } + + @Override + public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + /* + * No-op. + */ + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index f6e28f7615f..8e2239d18ed 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; @@ -869,4 +870,11 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { i += 1; } } + + @Override + public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + /* + * No-op. + */ + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 0f16bb631dd..d60536409e1 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -38,10 +38,8 @@ import org.elasticsearch.common.time.DateFormatters; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.search.aggregations.AggregationBuilder; -import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.MultiBucketConsumerService.TooManyBucketsException; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; @@ -967,71 +965,6 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase { ); } - public void testMaxBucket() throws IOException { - Query query = new MatchAllDocsQuery(); - List timestamps = Arrays.asList( - "2010-01-01T00:00:00.000Z", - "2011-01-01T00:00:00.000Z", - "2017-01-01T00:00:00.000Z" - ); - - expectThrows(TooManyBucketsException.class, () -> testSearchCase(query, timestamps, - aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE).minDocCount(0L), - histogram -> {}, 100, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> - aggregation.fixedInterval(DateHistogramInterval.seconds(5)) - .field(AGGREGABLE_DATE) - .subAggregation( - AggregationBuilders.dateHistogram("1") - .fixedInterval(DateHistogramInterval.seconds(5)) - .field(AGGREGABLE_DATE) - ), - histogram -> {}, 5, false)); - } - - public void testMaxBucketDeprecated() throws IOException { - Query query = new MatchAllDocsQuery(); - List timestamps = Arrays.asList( - "2010-01-01T00:00:00.000Z", - "2011-01-01T00:00:00.000Z", - "2017-01-01T00:00:00.000Z" - ); - - expectThrows(TooManyBucketsException.class, () -> testSearchCase(query, timestamps, - aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE).minDocCount(0L), - histogram -> {}, 100, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> - aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)) - .field(AGGREGABLE_DATE) - .subAggregation( - AggregationBuilders.dateHistogram("1") - .dateHistogramInterval(DateHistogramInterval.seconds(5)) - .field(AGGREGABLE_DATE) - ), - histogram -> {}, 5, false)); - assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future."); - } - public void testFixedWithCalendar() throws IOException { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testSearchCase(new MatchAllDocsQuery(), Arrays.asList( diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java index 7e144f11497..fd49e57e7c9 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java @@ -164,7 +164,7 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati /* * Guess the interval to use based on the roughly estimated * duration. It'll be accurate or it'll produce more buckets - * than we need but it is quick. + * than we need but it is quick. */ if (normalizedDuration != 0) { for (int j = roundingInfo.innerIntervals.length-1; j >= 0; j--) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java index 2fb281170b7..14f9d9871b4 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java @@ -36,7 +36,9 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import java.io.IOException; @@ -304,4 +306,11 @@ public class DateRangeAggregatorTests extends AggregatorTestCase { } } } + + @Override + public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + /* + * No-op. + */ + } } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 2cd69a8e458..531188c2e6b 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -427,7 +427,6 @@ public abstract class AggregatorTestCase extends ESTestCase { a.postCollection(); @SuppressWarnings("unchecked") A result = (A) a.buildTopLevel(); - InternalAggregationTestCase.assertMultiBucketConsumer(result, bucketConsumer); return result; } @@ -498,7 +497,6 @@ public abstract class AggregatorTestCase extends ESTestCase { a.postCollection(); InternalAggregation agg = a.buildTopLevel(); aggs.add(agg); - InternalAggregationTestCase.assertMultiBucketConsumer(agg, shardBucketConsumer); } if (aggs.isEmpty()) { return (A) root.buildEmptyAggregation(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 128f4debce7..a00456142c3 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -44,6 +44,7 @@ import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; import org.elasticsearch.search.aggregations.ParsedAggregation; import org.elasticsearch.search.aggregations.bucket.adjacency.AdjacencyMatrixAggregationBuilder; @@ -82,15 +83,15 @@ import org.elasticsearch.search.aggregations.bucket.range.ParsedRange; import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler; import org.elasticsearch.search.aggregations.bucket.sampler.ParsedSampler; -import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantLongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantStringTerms; -import org.elasticsearch.search.aggregations.bucket.terms.SignificantLongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.SignificantStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms; import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantLongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.SignificantLongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.SignificantStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder; @@ -392,10 +393,15 @@ public abstract class InternalAggregationTestCase bigArrays, mockScriptService, bucketConsumer, PipelineTree.EMPTY); @SuppressWarnings("unchecked") T reduced = (T) inputs.get(0).reduce(toReduce, context); - assertMultiBucketConsumer(reduced, bucketConsumer); + doAssertReducedMultiBucketConsumer(reduced, bucketConsumer); assertReduced(reduced, inputs); } + protected void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + InternalAggregationTestCase.assertMultiBucketConsumer(agg, bucketConsumer); + } + + /** * overwrite in tests that need it */ diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java index c8d1fbee2f0..ec8204b362a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java @@ -24,6 +24,7 @@ import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.ParsedAggregation; import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; @@ -184,4 +185,11 @@ public abstract class InternalMultiBucketAggregationTestCase c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000").executeQuery() + () -> c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000").executeQuery() ); - assertEquals("The maximum LIMIT for aggregate sorting is [10000], received [12000]", e.getMessage()); + assertEquals("The maximum LIMIT for aggregate sorting is [65535], received [120000]", e.getMessage()); } } } diff --git a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java index 5cae19fc69b..5fb92b19741 100644 --- a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java +++ b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java @@ -117,9 +117,9 @@ public abstract class ErrorsTestCase extends CliIntegrationTestCase implements o @Override public void testHardLimitForSortOnAggregate() throws Exception { index("test", body -> body.field("a", 1).field("b", 2)); - String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000"); + String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000"); assertEquals( - START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [10000], received [12000]" + END, + START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [65535], received [120000]" + END, commandResult ); } diff --git a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index 4bb9b9da140..9ef1067a498 100644 --- a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -439,8 +439,8 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err public void testHardLimitForSortOnAggregate() throws Exception { index("{\"a\": 1, \"b\": 2}"); expectBadRequest( - () -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000"), - containsString("The maximum LIMIT for aggregate sorting is [10000], received [12000]") + () -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000"), + containsString("The maximum LIMIT for aggregate sorting is [65535], received [120000]") ); }