Speed up reducing auto_date_histo with a time zone (backport of #57933) (#57958)

When reducing `auto_date_histogram` we were using `Rounding#round`
which is quite a bit more expensive than
```
Rounding.Prepared prepared = rounding.prepare(min, max);
long result = prepared.round(date);
```
when rounding to a non-fixed time zone like `America/New_York`. This
stops using the former and starts using the latter.

Relates to #56124
This commit is contained in:
Nik Everett 2020-06-11 09:15:12 -04:00 committed by GitHub
parent 54d4f2a623
commit da72a3a51d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 83 additions and 48 deletions

View File

@ -293,15 +293,17 @@ public final class InternalAutoDateHistogram extends
// First we need to find the highest level rounding used across all the
// shards
int reduceRoundingIdx = 0;
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
for (InternalAggregation aggregation : aggregations) {
int aggRoundingIdx = ((InternalAutoDateHistogram) aggregation).bucketInfo.roundingIdx;
if (aggRoundingIdx > reduceRoundingIdx) {
reduceRoundingIdx = aggRoundingIdx;
InternalAutoDateHistogram agg = ((InternalAutoDateHistogram) aggregation);
reduceRoundingIdx = Math.max(agg.bucketInfo.roundingIdx, reduceRoundingIdx);
if (false == agg.buckets.isEmpty()) {
min = Math.min(min, agg.buckets.get(0).key);
max = Math.max(max, agg.buckets.get(agg.buckets.size() - 1).key);
}
}
// This rounding will be used to reduce all the buckets
RoundingInfo reduceRoundingInfo = bucketInfo.roundingInfos[reduceRoundingIdx];
Rounding reduceRounding = reduceRoundingInfo.rounding;
Rounding.Prepared reduceRounding = prepare(reduceRoundingIdx, min, max);
final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
@Override
@ -351,21 +353,33 @@ public final class InternalAutoDateHistogram extends
}
}
return mergeBucketsIfNeeded(reducedBuckets, reduceRoundingIdx, reduceRoundingInfo, reduceContext);
return mergeBucketsIfNeeded(
new BucketReduceResult(reducedBuckets, reduceRoundingIdx, 1, reduceRounding, min, max),
reduceContext
);
}
private BucketReduceResult mergeBucketsIfNeeded(List<Bucket> reducedBuckets, int reduceRoundingIdx, RoundingInfo reduceRoundingInfo,
ReduceContext reduceContext) {
while (reducedBuckets.size() > (targetBuckets * reduceRoundingInfo.getMaximumInnerInterval())
&& reduceRoundingIdx < bucketInfo.roundingInfos.length - 1) {
reduceRoundingIdx++;
reduceRoundingInfo = bucketInfo.roundingInfos[reduceRoundingIdx];
reducedBuckets = mergeBuckets(reducedBuckets, reduceRoundingInfo.rounding, reduceContext);
private BucketReduceResult mergeBucketsIfNeeded(BucketReduceResult firstPassResult, ReduceContext reduceContext) {
int idx = firstPassResult.roundingIdx;
RoundingInfo info = bucketInfo.roundingInfos[idx];
List<Bucket> buckets = firstPassResult.buckets;
Rounding.Prepared prepared = firstPassResult.preparedRounding;
while (buckets.size() > (targetBuckets * info.getMaximumInnerInterval())
&& idx < bucketInfo.roundingInfos.length - 1) {
idx++;
info = bucketInfo.roundingInfos[idx];
prepared = prepare(idx, firstPassResult.min, firstPassResult.max);
buckets = mergeBuckets(buckets, prepared, reduceContext);
}
return new BucketReduceResult(reducedBuckets, reduceRoundingInfo, reduceRoundingIdx, 1);
return new BucketReduceResult(buckets, idx, 1, prepared, firstPassResult.min, firstPassResult.max);
}
private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRounding, ReduceContext reduceContext) {
private Rounding.Prepared prepare(int idx, long min, long max) {
Rounding rounding = bucketInfo.roundingInfos[idx].rounding;
return min <= max ? rounding.prepare(min, max) : rounding.prepareForUnknown();
}
private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding.Prepared reduceRounding, ReduceContext reduceContext) {
List<Bucket> mergedBuckets = new ArrayList<>();
List<Bucket> sameKeyedBuckets = new ArrayList<>();
@ -405,35 +419,53 @@ public final class InternalAutoDateHistogram extends
}
private static class BucketReduceResult {
List<Bucket> buckets;
RoundingInfo roundingInfo;
int roundingIdx;
long innerInterval;
final List<Bucket> buckets;
final int roundingIdx;
final long innerInterval;
final Rounding.Prepared preparedRounding;
final long min;
final long max;
BucketReduceResult(List<Bucket> buckets, RoundingInfo roundingInfo, int roundingIdx, long innerInterval) {
BucketReduceResult(
List<Bucket> buckets,
int roundingIdx,
long innerInterval,
Rounding.Prepared preparedRounding,
long min,
long max
) {
this.buckets = buckets;
this.roundingInfo = roundingInfo;
this.roundingIdx = roundingIdx;
this.innerInterval = innerInterval;
this.preparedRounding = preparedRounding;
this.min = min;
this.max = max;
}
}
private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, ReduceContext reduceContext) {
List<Bucket> list = currentResult.buckets;
private BucketReduceResult addEmptyBuckets(BucketReduceResult current, ReduceContext reduceContext) {
List<Bucket> list = current.buckets;
if (list.isEmpty()) {
return currentResult;
return current;
}
int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, currentResult.roundingIdx,
bucketInfo.roundingInfos, targetBuckets);
RoundingInfo roundingInfo = bucketInfo.roundingInfos[roundingIdx];
Rounding rounding = roundingInfo.rounding;
int roundingIdx = getAppropriateRounding(
list.get(0).key,
list.get(list.size() - 1).key,
current.roundingIdx,
bucketInfo.roundingInfos,
targetBuckets
);
Rounding.Prepared rounding = current.roundingIdx == roundingIdx
? current.preparedRounding
: prepare(roundingIdx, current.min, current.max);
// merge buckets using the new rounding
list = mergeBuckets(list, rounding, reduceContext);
Bucket lastBucket = null;
ListIterator<Bucket> iter = list.listIterator();
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(Collections.singletonList(bucketInfo.emptySubAggregations),
reduceContext);
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(
org.elasticsearch.common.collect.List.of(bucketInfo.emptySubAggregations), reduceContext
);
// Add the empty buckets within the data,
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6
@ -449,7 +481,7 @@ public final class InternalAutoDateHistogram extends
}
lastBucket = iter.next();
}
return new BucketReduceResult(list, roundingInfo, roundingIdx, currentResult.innerInterval);
return new BucketReduceResult(list, roundingIdx, 1, rounding, current.min, current.max);
}
static int getAppropriateRounding(long minKey, long maxKey, int roundingIdx,
@ -501,8 +533,7 @@ public final class InternalAutoDateHistogram extends
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext);
// Adding empty buckets may have tipped us over the target so merge the buckets again if needed
reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx,
reducedBucketsResult.roundingInfo, reduceContext);
reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult, reduceContext);
// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
@ -515,11 +546,9 @@ public final class InternalAutoDateHistogram extends
getMetadata(), reducedBucketsResult.innerInterval);
}
private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult reducedBucketsResult,
ReduceContext reduceContext) {
List<Bucket> buckets = reducedBucketsResult.buckets;
RoundingInfo roundingInfo = reducedBucketsResult.roundingInfo;
int roundingIdx = reducedBucketsResult.roundingIdx;
private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult current, ReduceContext reduceContext) {
List<Bucket> buckets = current.buckets;
RoundingInfo roundingInfo = bucketInfo.roundingInfos[current.roundingIdx];
if (buckets.size() > targetBuckets) {
for (int interval : roundingInfo.innerIntervals) {
int resultingBuckets = buckets.size() / interval;
@ -527,32 +556,38 @@ public final class InternalAutoDateHistogram extends
resultingBuckets++;
}
if (resultingBuckets <= targetBuckets) {
return mergeConsecutiveBuckets(buckets, interval, roundingIdx, roundingInfo, reduceContext);
return mergeConsecutiveBuckets(current, interval, reduceContext);
}
}
}
return reducedBucketsResult;
return current;
}
private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets, int mergeInterval, int roundingIdx,
RoundingInfo roundingInfo, ReduceContext reduceContext) {
private BucketReduceResult mergeConsecutiveBuckets(BucketReduceResult current, int mergeInterval, ReduceContext reduceContext) {
List<Bucket> mergedBuckets = new ArrayList<>();
List<Bucket> sameKeyedBuckets = new ArrayList<>();
double key = roundingInfo.rounding.round(reducedBuckets.get(0).key);
for (int i = 0; i < reducedBuckets.size(); i++) {
Bucket bucket = reducedBuckets.get(i);
double key = current.preparedRounding.round(current.buckets.get(0).key);
for (int i = 0; i < current.buckets.size(); i++) {
Bucket bucket = current.buckets.get(i);
if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) {
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
sameKeyedBuckets.clear();
key = roundingInfo.rounding.round(bucket.key);
key = current.preparedRounding.round(bucket.key);
}
sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations));
}
if (sameKeyedBuckets.isEmpty() == false) {
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
}
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);
return new BucketReduceResult(
mergedBuckets,
current.roundingIdx,
mergeInterval,
current.preparedRounding,
current.min,
current.max
);
}
@Override