From da72a3a51dcea569a4b1b0a966a4d35a2b49075f Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 11 Jun 2020 09:15:12 -0400 Subject: [PATCH] Speed up reducing auto_date_histo with a time zone (backport of #57933) (#57958) When reducing `auto_date_histogram` we were using `Rounding#round` which is quite a bit more expensive than ``` Rounding.Prepared prepared = rounding.prepare(min, max); long result = prepared.round(date); ``` when rounding to a non-fixed time zone like `America/New_York`. This stops using the former and starts using the latter. Relates to #56124 --- .../histogram/InternalAutoDateHistogram.java | 131 +++++++++++------- 1 file changed, 83 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java index 630e4ced5f8..a7e33377067 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -293,15 +293,17 @@ public final class InternalAutoDateHistogram extends // First we need to find the highest level rounding used across all the // shards int reduceRoundingIdx = 0; + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; for (InternalAggregation aggregation : aggregations) { - int aggRoundingIdx = ((InternalAutoDateHistogram) aggregation).bucketInfo.roundingIdx; - if (aggRoundingIdx > reduceRoundingIdx) { - reduceRoundingIdx = aggRoundingIdx; + InternalAutoDateHistogram agg = ((InternalAutoDateHistogram) aggregation); + reduceRoundingIdx = Math.max(agg.bucketInfo.roundingIdx, reduceRoundingIdx); + if (false == agg.buckets.isEmpty()) { + min = Math.min(min, agg.buckets.get(0).key); + max = Math.max(max, agg.buckets.get(agg.buckets.size() - 1).key); } } - // This rounding will be used to reduce all the buckets - RoundingInfo reduceRoundingInfo = bucketInfo.roundingInfos[reduceRoundingIdx]; - Rounding reduceRounding = reduceRoundingInfo.rounding; + Rounding.Prepared reduceRounding = prepare(reduceRoundingIdx, min, max); final PriorityQueue pq = new PriorityQueue(aggregations.size()) { @Override @@ -351,21 +353,33 @@ public final class InternalAutoDateHistogram extends } } - return mergeBucketsIfNeeded(reducedBuckets, reduceRoundingIdx, reduceRoundingInfo, reduceContext); + return mergeBucketsIfNeeded( + new BucketReduceResult(reducedBuckets, reduceRoundingIdx, 1, reduceRounding, min, max), + reduceContext + ); } - private BucketReduceResult mergeBucketsIfNeeded(List reducedBuckets, int reduceRoundingIdx, RoundingInfo reduceRoundingInfo, - ReduceContext reduceContext) { - while (reducedBuckets.size() > (targetBuckets * reduceRoundingInfo.getMaximumInnerInterval()) - && reduceRoundingIdx < bucketInfo.roundingInfos.length - 1) { - reduceRoundingIdx++; - reduceRoundingInfo = bucketInfo.roundingInfos[reduceRoundingIdx]; - reducedBuckets = mergeBuckets(reducedBuckets, reduceRoundingInfo.rounding, reduceContext); + private BucketReduceResult mergeBucketsIfNeeded(BucketReduceResult firstPassResult, ReduceContext reduceContext) { + int idx = firstPassResult.roundingIdx; + RoundingInfo info = bucketInfo.roundingInfos[idx]; + List buckets = firstPassResult.buckets; + Rounding.Prepared prepared = firstPassResult.preparedRounding; + while (buckets.size() > (targetBuckets * info.getMaximumInnerInterval()) + && idx < bucketInfo.roundingInfos.length - 1) { + idx++; + info = bucketInfo.roundingInfos[idx]; + prepared = prepare(idx, firstPassResult.min, firstPassResult.max); + buckets = mergeBuckets(buckets, prepared, reduceContext); } - return new BucketReduceResult(reducedBuckets, reduceRoundingInfo, reduceRoundingIdx, 1); + return new BucketReduceResult(buckets, idx, 1, prepared, firstPassResult.min, firstPassResult.max); } - private List mergeBuckets(List reducedBuckets, Rounding reduceRounding, ReduceContext reduceContext) { + private Rounding.Prepared prepare(int idx, long min, long max) { + Rounding rounding = bucketInfo.roundingInfos[idx].rounding; + return min <= max ? rounding.prepare(min, max) : rounding.prepareForUnknown(); + } + + private List mergeBuckets(List reducedBuckets, Rounding.Prepared reduceRounding, ReduceContext reduceContext) { List mergedBuckets = new ArrayList<>(); List sameKeyedBuckets = new ArrayList<>(); @@ -405,35 +419,53 @@ public final class InternalAutoDateHistogram extends } private static class BucketReduceResult { - List buckets; - RoundingInfo roundingInfo; - int roundingIdx; - long innerInterval; + final List buckets; + final int roundingIdx; + final long innerInterval; + final Rounding.Prepared preparedRounding; + final long min; + final long max; - BucketReduceResult(List buckets, RoundingInfo roundingInfo, int roundingIdx, long innerInterval) { + BucketReduceResult( + List buckets, + int roundingIdx, + long innerInterval, + Rounding.Prepared preparedRounding, + long min, + long max + ) { this.buckets = buckets; - this.roundingInfo = roundingInfo; this.roundingIdx = roundingIdx; this.innerInterval = innerInterval; + this.preparedRounding = preparedRounding; + this.min = min; + this.max = max; } } - private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, ReduceContext reduceContext) { - List list = currentResult.buckets; + private BucketReduceResult addEmptyBuckets(BucketReduceResult current, ReduceContext reduceContext) { + List list = current.buckets; if (list.isEmpty()) { - return currentResult; + return current; } - int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, currentResult.roundingIdx, - bucketInfo.roundingInfos, targetBuckets); - RoundingInfo roundingInfo = bucketInfo.roundingInfos[roundingIdx]; - Rounding rounding = roundingInfo.rounding; + int roundingIdx = getAppropriateRounding( + list.get(0).key, + list.get(list.size() - 1).key, + current.roundingIdx, + bucketInfo.roundingInfos, + targetBuckets + ); + Rounding.Prepared rounding = current.roundingIdx == roundingIdx + ? current.preparedRounding + : prepare(roundingIdx, current.min, current.max); // merge buckets using the new rounding list = mergeBuckets(list, rounding, reduceContext); Bucket lastBucket = null; ListIterator iter = list.listIterator(); - InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(Collections.singletonList(bucketInfo.emptySubAggregations), - reduceContext); + InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce( + org.elasticsearch.common.collect.List.of(bucketInfo.emptySubAggregations), reduceContext + ); // Add the empty buckets within the data, // e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6 @@ -449,7 +481,7 @@ public final class InternalAutoDateHistogram extends } lastBucket = iter.next(); } - return new BucketReduceResult(list, roundingInfo, roundingIdx, currentResult.innerInterval); + return new BucketReduceResult(list, roundingIdx, 1, rounding, current.min, current.max); } static int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, @@ -501,8 +533,7 @@ public final class InternalAutoDateHistogram extends reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext); // Adding empty buckets may have tipped us over the target so merge the buckets again if needed - reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx, - reducedBucketsResult.roundingInfo, reduceContext); + reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult, reduceContext); // Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext); @@ -515,11 +546,9 @@ public final class InternalAutoDateHistogram extends getMetadata(), reducedBucketsResult.innerInterval); } - private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult reducedBucketsResult, - ReduceContext reduceContext) { - List buckets = reducedBucketsResult.buckets; - RoundingInfo roundingInfo = reducedBucketsResult.roundingInfo; - int roundingIdx = reducedBucketsResult.roundingIdx; + private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult current, ReduceContext reduceContext) { + List buckets = current.buckets; + RoundingInfo roundingInfo = bucketInfo.roundingInfos[current.roundingIdx]; if (buckets.size() > targetBuckets) { for (int interval : roundingInfo.innerIntervals) { int resultingBuckets = buckets.size() / interval; @@ -527,32 +556,38 @@ public final class InternalAutoDateHistogram extends resultingBuckets++; } if (resultingBuckets <= targetBuckets) { - return mergeConsecutiveBuckets(buckets, interval, roundingIdx, roundingInfo, reduceContext); + return mergeConsecutiveBuckets(current, interval, reduceContext); } } } - return reducedBucketsResult; + return current; } - private BucketReduceResult mergeConsecutiveBuckets(List reducedBuckets, int mergeInterval, int roundingIdx, - RoundingInfo roundingInfo, ReduceContext reduceContext) { + private BucketReduceResult mergeConsecutiveBuckets(BucketReduceResult current, int mergeInterval, ReduceContext reduceContext) { List mergedBuckets = new ArrayList<>(); List sameKeyedBuckets = new ArrayList<>(); - double key = roundingInfo.rounding.round(reducedBuckets.get(0).key); - for (int i = 0; i < reducedBuckets.size(); i++) { - Bucket bucket = reducedBuckets.get(i); + double key = current.preparedRounding.round(current.buckets.get(0).key); + for (int i = 0; i < current.buckets.size(); i++) { + Bucket bucket = current.buckets.get(i); if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) { mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); sameKeyedBuckets.clear(); - key = roundingInfo.rounding.round(bucket.key); + key = current.preparedRounding.round(bucket.key); } sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations)); } if (sameKeyedBuckets.isEmpty() == false) { mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); } - return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval); + return new BucketReduceResult( + mergedBuckets, + current.roundingIdx, + mergeInterval, + current.preparedRounding, + current.min, + current.max + ); } @Override