Improve reduction of terms aggregations (#61779) (#62028)

Today, the terms aggregation reduces multiple aggregations at once using a map
to group same buckets together. This operation can be costly since it requires
to lookup every bucket in a global map with no particular order.
This commit changes how term buckets are sorted by shards and partial reduces in
order to be able to reduce results using a merge-sort strategy.
For bwc, results are merged with the legacy code if any of the aggregations use
a different sort (if it was returned by a node in prior versions).

Relates #51857
This commit is contained in:
Jim Ferenczi 2020-09-07 13:13:20 +02:00 committed by GitHub
parent b07b75ce14
commit fa8e76abb1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 418 additions and 242 deletions

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import java.util.Iterator;
public class IteratorAndCurrent<B extends InternalMultiBucketAggregation.InternalBucket> implements Iterator<B> {
private final Iterator<B> iterator;
private B current;
public IteratorAndCurrent(Iterator<B> iterator) {
this.iterator = iterator;
this.current = iterator.next();
}
public Iterator<B> getIterator() {
return iterator;
}
public B current() {
return current;
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public B next() {
return current = iterator.next();
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
@ -38,7 +39,6 @@ import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
@ -270,17 +270,6 @@ public final class InternalAutoDateHistogram extends
return new Bucket(prototype.key, prototype.docCount, prototype.format, aggregations);
}
private static class IteratorAndCurrent {
private final Iterator<Bucket> iterator;
private Bucket current;
IteratorAndCurrent(Iterator<Bucket> iterator) {
this.iterator = iterator;
current = iterator.next();
}
}
/**
* This method works almost exactly the same as
* InternalDateHistogram#reduceBuckets(List, ReduceContext), the different
@ -305,10 +294,10 @@ public final class InternalAutoDateHistogram extends
}
Rounding.Prepared reduceRounding = prepare(reduceRoundingIdx, min, max);
final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<IteratorAndCurrent<Bucket>>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
return a.current.key < b.current.key;
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
return a.current().key < b.current().key;
}
};
for (InternalAggregation aggregation : aggregations) {
@ -322,25 +311,24 @@ public final class InternalAutoDateHistogram extends
if (pq.size() > 0) {
// list of buckets coming from different shards that have the same key
List<Bucket> currentBuckets = new ArrayList<>();
long key = reduceRounding.round(pq.top().current.key);
long key = reduceRounding.round(pq.top().current().key);
do {
final IteratorAndCurrent top = pq.top();
final IteratorAndCurrent<Bucket> top = pq.top();
if (reduceRounding.round(top.current.key) != key) {
if (reduceRounding.round(top.current().key) != key) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
reducedBuckets.add(reduced);
currentBuckets.clear();
key = reduceRounding.round(top.current.key);
key = reduceRounding.round(top.current().key);
}
currentBuckets.add(top.current);
currentBuckets.add(top.current());
if (top.iterator.hasNext()) {
final Bucket next = top.iterator.next();
assert next.key > top.current.key : "shards must return data sorted by key";
top.current = next;
if (top.hasNext()) {
top.next();
assert top.current().key > key: "shards must return data sorted by key";
pq.updateTop();
} else {
pq.pop();

View File

@ -32,6 +32,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import java.io.IOException;
@ -39,7 +40,6 @@ import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
@ -289,24 +289,11 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
}
private static class IteratorAndCurrent {
private final Iterator<Bucket> iterator;
private Bucket current;
IteratorAndCurrent(Iterator<Bucket> iterator) {
this.iterator = iterator;
current = iterator.next();
}
}
private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<IteratorAndCurrent<Bucket>>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
return a.current.key < b.current.key;
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
return a.current().key < b.current().key;
}
};
for (InternalAggregation aggregation : aggregations) {
@ -320,27 +307,26 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
if (pq.size() > 0) {
// list of buckets coming from different shards that have the same key
List<Bucket> currentBuckets = new ArrayList<>();
double key = pq.top().current.key;
double key = pq.top().current().key;
do {
final IteratorAndCurrent top = pq.top();
final IteratorAndCurrent<Bucket> top = pq.top();
if (top.current.key != key) {
if (top.current().key != key) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
reducedBuckets.add(reduced);
}
currentBuckets.clear();
key = top.current.key;
key = top.current().key;
}
currentBuckets.add(top.current);
currentBuckets.add(top.current());
if (top.iterator.hasNext()) {
final Bucket next = top.iterator.next();
assert next.key > top.current.key : "shards must return data sorted by key";
top.current = next;
if (top.hasNext()) {
top.next();
assert top.current().key > key : "shards must return data sorted by key";
pq.updateTop();
} else {
pq.pop();

View File

@ -31,12 +31,12 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
@ -279,24 +279,12 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
}
private static class IteratorAndCurrent {
private final Iterator<Bucket> iterator;
private Bucket current;
IteratorAndCurrent(Iterator<Bucket> iterator) {
this.iterator = iterator;
current = iterator.next();
}
}
private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<IteratorAndCurrent<Bucket>>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
return Double.compare(a.current.key, b.current.key) < 0;
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
return Double.compare(a.current().key, b.current().key) < 0;
}
};
for (InternalAggregation aggregation : aggregations) {
@ -310,12 +298,12 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
if (pq.size() > 0) {
// list of buckets coming from different shards that have the same key
List<Bucket> currentBuckets = new ArrayList<>();
double key = pq.top().current.key;
double key = pq.top().current().key;
do {
final IteratorAndCurrent top = pq.top();
final IteratorAndCurrent<Bucket> top = pq.top();
if (Double.compare(top.current.key, key) != 0) {
if (Double.compare(top.current().key, key) != 0) {
// The key changes, reduce what we already buffered and reset the buffer for current buckets.
// Using Double.compare instead of != to handle NaN correctly.
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
@ -323,15 +311,14 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
reducedBuckets.add(reduced);
}
currentBuckets.clear();
key = top.current.key;
key = top.current().key;
}
currentBuckets.add(top.current);
currentBuckets.add(top.current());
if (top.iterator.hasNext()) {
final Bucket next = top.iterator.next();
assert Double.compare(next.key, top.current.key) > 0 : "shards must return data sorted by key";
top.current = next;
if (top.hasNext()) {
top.next();
assert Double.compare(top.current().key, key) > 0 : "shards must return data sorted by key";
pq.updateTop();
} else {
pq.pop();

View File

@ -29,13 +29,13 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -317,18 +317,6 @@ public class InternalVariableWidthHistogram
= */
private double nextKey(double key){ return key + 1; }
private static class IteratorAndCurrent {
private final Iterator<Bucket> iterator;
private Bucket current;
IteratorAndCurrent(Iterator<Bucket> iterator) {
this.iterator = iterator;
current = iterator.next();
}
}
@Override
protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
@ -350,10 +338,10 @@ public class InternalVariableWidthHistogram
}
public List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<IteratorAndCurrent<Bucket>>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
return Double.compare(a.current.centroid, b.current.centroid) < 0;
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
return Double.compare(a.current().centroid, b.current().centroid) < 0;
}
};
for (InternalAggregation aggregation : aggregations) {
@ -365,27 +353,27 @@ public class InternalVariableWidthHistogram
List<Bucket> reducedBuckets = new ArrayList<>();
if(pq.size() > 0) {
double key = pq.top().current.centroid();
double key = pq.top().current().centroid();
// list of buckets coming from different shards that have the same key
List<Bucket> currentBuckets = new ArrayList<>();
do {
IteratorAndCurrent top = pq.top();
IteratorAndCurrent<Bucket> top = pq.top();
if (Double.compare(top.current.centroid(), key) != 0) {
if (Double.compare(top.current().centroid(), key) != 0) {
// The key changes, reduce what we already buffered and reset the buffer for current buckets.
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reduced);
currentBuckets.clear();
key = top.current.centroid();
key = top.current().centroid();
}
currentBuckets.add(top.current);
currentBuckets.add(top.current());
if (top.iterator.hasNext()) {
Bucket next = top.iterator.next();
assert next.compareKey(top.current) >= 0 : "shards must return data sorted by centroid";
top.current = next;
if (top.hasNext()) {
Bucket prev = top.current();
top.next();
assert top.current().compareKey(prev) >= 0 : "shards must return data sorted by centroid";
pq.updateTop();
} else {
pq.pop();

View File

@ -45,7 +45,7 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator {
}
protected StringTerms buildEmptyTermsAggregation() {
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
return new StringTerms(name, order, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0);
}

View File

@ -100,10 +100,10 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
}
}
public DoubleTerms(String name, BucketOrder order, int requiredSize, long minDocCount,
public DoubleTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
List<Bucket> buckets, long docCountError) {
super(name, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
otherDocCount, buckets, docCountError);
}
@ -121,7 +121,7 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
@Override
public DoubleTerms create(List<Bucket> buckets) {
return new DoubleTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize,
return new DoubleTerms(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
showTermDocCountError, otherDocCount, buckets, docCountError);
}
@ -132,8 +132,8 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
}
@Override
protected DoubleTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount) {
return new DoubleTerms(name, order, requiredSize, minDocCount, getMetadata(), format,
protected DoubleTerms create(String name, List<Bucket> buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount) {
return new DoubleTerms(name, reduceOrder, order, requiredSize, minDocCount, getMetadata(), format,
shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
@ -58,6 +59,7 @@ import java.util.function.LongPredicate;
import java.util.function.LongUnaryOperator;
import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
/**
* An aggregator of string values that relies on global ordinals in order to build buckets.
@ -275,6 +277,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
LowCardinality(
String name,
AggregatorFactories factories,
Function<GlobalOrdinalsStringTermsAggregator, ResultStrategy<?, ?, ?>> resultStrategy,
ValuesSource.Bytes.WithOrdinals valuesSource,
BucketOrder order,
DocValueFormat format,
@ -286,8 +289,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
boolean showTermDocCountError,
Map<String, Object> metadata
) throws IOException {
super(name, factories, a -> a.new StandardTermsResults(), valuesSource, order, format, bucketCountThresholds, null,
context, parent, remapGlobalOrds, collectionMode, showTermDocCountError, CardinalityUpperBound.ONE, metadata);
super(name, factories, resultStrategy, valuesSource, order, format, bucketCountThresholds, null, context,
parent, remapGlobalOrds, collectionMode, showTermDocCountError, CardinalityUpperBound.ONE, metadata);
assert factories == null || factories.countAggregators() == 0;
this.segmentDocCounts = context.bigArrays().newIntArray(1, true);
}
@ -724,8 +727,15 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
@Override
StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bucket[] topBuckets) {
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
final BucketOrder reduceOrder;
if (isKeyOrder(order) == false) {
reduceOrder = InternalOrder.key(true);
Arrays.sort(topBuckets, reduceOrder.comparator());
} else {
reduceOrder = order;
}
return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
otherDocCount, Arrays.asList(topBuckets), 0);
}

View File

@ -45,10 +45,10 @@ public abstract class InternalMappedTerms<A extends InternalTerms<A, B>, B exten
protected long docCountError;
protected InternalMappedTerms(String name, BucketOrder order, int requiredSize, long minDocCount,
protected InternalMappedTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize,
boolean showTermDocCountError, long otherDocCount, List<B> buckets, long docCountError) {
super(name, order, requiredSize, minDocCount, metadata);
super(name, reduceOrder, order, requiredSize, minDocCount, metadata);
this.format = format;
this.shardSize = shardSize;
this.showTermDocCountError = showTermDocCountError;

View File

@ -18,6 +18,8 @@
*/
package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -31,15 +33,20 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends InternalTerms.Bucket<B>>
extends InternalMultiBucketAggregation<A, B> implements Terms {
@ -152,12 +159,28 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
}
}
protected final BucketOrder reduceOrder;
protected final BucketOrder order;
protected final int requiredSize;
protected final long minDocCount;
protected InternalTerms(String name, BucketOrder order, int requiredSize, long minDocCount, Map<String, Object> metadata) {
/**
* Creates a new {@link InternalTerms}
* @param name The name of the aggregation
* @param reduceOrder The {@link BucketOrder} that should be used to merge shard results.
* @param order The {@link BucketOrder} that should be used to sort the final reduce.
* @param requiredSize The number of top buckets.
* @param minDocCount The minimum number of documents allowed per bucket.
* @param metadata The metadata associated with the aggregation.
*/
protected InternalTerms(String name,
BucketOrder reduceOrder,
BucketOrder order,
int requiredSize,
long minDocCount,
Map<String, Object> metadata) {
super(name, metadata);
this.reduceOrder = reduceOrder;
this.order = order;
this.requiredSize = requiredSize;
this.minDocCount = minDocCount;
@ -168,13 +191,21 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
*/
protected InternalTerms(StreamInput in) throws IOException {
super(in);
order = InternalOrder.Streams.readOrder(in);
reduceOrder = InternalOrder.Streams.readOrder(in);
if (in.getVersion().onOrAfter(Version.V_7_10_0)) {
order = InternalOrder.Streams.readOrder(in);
} else {
order = reduceOrder;
}
requiredSize = readSize(in);
minDocCount = in.readVLong();
}
@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_10_0)) {
reduceOrder.writeTo(out);
}
order.writeTo(out);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
@ -189,21 +220,128 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
@Override
public abstract B getBucketByKey(String term);
@Override
private BucketOrder getReduceOrder(List<InternalAggregation> aggregations) {
BucketOrder thisReduceOrder = null;
for (InternalAggregation aggregation : aggregations) {
@SuppressWarnings("unchecked")
InternalTerms<A, B> terms = (InternalTerms<A, B>) aggregation;
if (terms.getBuckets().size() == 0) {
continue;
}
if (thisReduceOrder == null) {
thisReduceOrder = terms.reduceOrder;
} else if (thisReduceOrder.equals(terms.reduceOrder) == false) {
return order;
}
}
return thisReduceOrder != null ? thisReduceOrder : order;
}
private long getDocCountError(InternalTerms<?, ?> terms) {
int size = terms.getBuckets().size();
if (size == 0 || size < terms.getShardSize() || isKeyOrder(terms.order)) {
return 0;
} else if (InternalOrder.isCountDesc(terms.order)) {
if (terms.getDocCountError() > 0) {
// If there is an existing docCountError for this agg then
// use this as the error for this aggregation
return terms.getDocCountError();
} else {
// otherwise use the doc count of the last term in the
// aggregation
return terms.getBuckets().stream().mapToLong(Bucket::getDocCount).min().getAsLong();
}
} else {
return -1;
}
}
private List<B> reduceMergeSort(List<InternalAggregation> aggregations,
BucketOrder reduceOrder, ReduceContext reduceContext) {
assert isKeyOrder(reduceOrder);
final Comparator<MultiBucketsAggregation.Bucket> cmp = reduceOrder.comparator();
final PriorityQueue<IteratorAndCurrent<B>> pq = new PriorityQueue<IteratorAndCurrent<B>>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {
return cmp.compare(a.current(), b.current()) < 0;
}
};
for (InternalAggregation aggregation : aggregations) {
@SuppressWarnings("unchecked")
InternalTerms<A, B> terms = (InternalTerms<A, B>) aggregation;
if (terms.getBuckets().isEmpty() == false) {
assert reduceOrder.equals(reduceOrder);
pq.add(new IteratorAndCurrent(terms.getBuckets().iterator()));
}
}
List<B> reducedBuckets = new ArrayList<>();
// list of buckets coming from different shards that have the same key
List<B> currentBuckets = new ArrayList<>();
B lastBucket = null;
while (pq.size() > 0) {
final IteratorAndCurrent<B> top = pq.top();
assert lastBucket == null || cmp.compare(top.current(), lastBucket) >= 0;
if (lastBucket != null && cmp.compare(top.current(), lastBucket) != 0) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final B reduced = reduceBucket(currentBuckets, reduceContext);
reducedBuckets.add(reduced);
currentBuckets.clear();
}
lastBucket = top.current();
currentBuckets.add(top.current());
if (top.hasNext()) {
top.next();
assert cmp.compare(top.current(), lastBucket) > 0 : "shards must return data sorted by key";
pq.updateTop();
} else {
pq.pop();
}
}
if (currentBuckets.isEmpty() == false) {
final B reduced = reduceBucket(currentBuckets, reduceContext);
reducedBuckets.add(reduced);
}
return reducedBuckets;
}
private List<B> reduceLegacy(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
Map<Object, List<B>> bucketMap = new HashMap<>();
for (InternalAggregation aggregation : aggregations) {
@SuppressWarnings("unchecked")
InternalTerms<A, B> terms = (InternalTerms<A, B>) aggregation;
if (terms.getBuckets().isEmpty() == false) {
for (B bucket : terms.getBuckets()) {
List<B> bucketList = bucketMap.get(bucket.getKey());
if (bucketList == null) {
bucketList = new ArrayList<>();
bucketMap.put(bucket.getKey(), bucketList);
}
bucketList.add(bucket);
}
}
}
List<B> reducedBuckets = new ArrayList<>();
for (List<B> sameTermBuckets : bucketMap.values()) {
final B b = reduceBucket(sameTermBuckets, reduceContext);
reducedBuckets.add(b);
}
return reducedBuckets;
}
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
Map<Object, List<B>> buckets = new HashMap<>();
long sumDocCountError = 0;
long otherDocCount = 0;
InternalTerms<A, B> referenceTerms = null;
for (InternalAggregation aggregation : aggregations) {
@SuppressWarnings("unchecked")
InternalTerms<A, B> terms = (InternalTerms<A, B>) aggregation;
if (referenceTerms == null && !aggregation.getClass().equals(UnmappedTerms.class)) {
if (referenceTerms == null && aggregation.getClass().equals(UnmappedTerms.class) == false) {
referenceTerms = terms;
}
if (referenceTerms != null &&
!referenceTerms.getClass().equals(terms.getClass()) &&
!terms.getClass().equals(UnmappedTerms.class)) {
referenceTerms.getClass().equals(terms.getClass()) == false &&
terms.getClass().equals(UnmappedTerms.class) == false) {
// control gets into this loop when the same field name against which the query is executed
// is of different types in different indices.
throw new AggregationExecutionException("Merging/Reducing the aggregations failed when computing the aggregation ["
@ -211,22 +349,7 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
+ "types in two different indices");
}
otherDocCount += terms.getSumOfOtherDocCounts();
final long thisAggDocCountError;
if (terms.getBuckets().size() < getShardSize() || InternalOrder.isKeyOrder(order)) {
thisAggDocCountError = 0;
} else if (InternalOrder.isCountDesc(order)) {
if (terms.getDocCountError() > 0) {
// If there is an existing docCountError for this agg then
// use this as the error for this aggregation
thisAggDocCountError = terms.getDocCountError();
} else {
// otherwise use the doc count of the last term in the
// aggregation
thisAggDocCountError = terms.getBuckets().get(terms.getBuckets().size() - 1).docCount;
}
} else {
thisAggDocCountError = -1;
}
final long thisAggDocCountError = getDocCountError(terms);
if (sumDocCountError != -1) {
if (thisAggDocCountError == -1) {
sumDocCountError = -1;
@ -244,28 +367,30 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
// Note that if the error is unbounded (-1) this will be fixed
// later in this method.
bucket.docCountError -= thisAggDocCountError;
List<B> bucketList = buckets.get(bucket.getKey());
if (bucketList == null) {
bucketList = new ArrayList<>();
buckets.put(bucket.getKey(), bucketList);
}
bucketList.add(bucket);
}
}
/**
* Buckets returned by a partial reduce or a shard response are sorted by key since {@link Version#V_7_10_0}.
* That allows to perform a merge sort when reducing multiple aggregations together.
* For backward compatibility, we disable the merge sort and use ({@link InternalTerms#reduceLegacy} if any of
* the provided aggregations use a different {@link InternalTerms#reduceOrder}.
*/
BucketOrder thisReduceOrder = getReduceOrder(aggregations);
List<B> reducedBuckets = isKeyOrder(thisReduceOrder) ?
reduceMergeSort(aggregations, thisReduceOrder, reduceContext) : reduceLegacy(aggregations, reduceContext);
final B[] list;
if (reduceContext.isFinalReduce()) {
final int size = Math.min(requiredSize, buckets.size());
final int size = Math.min(requiredSize, reducedBuckets.size());
// final comparator
final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator());
for (List<B> sameTermBuckets : buckets.values()) {
final B b = reduceBucket(sameTermBuckets, reduceContext);
for (B bucket : reducedBuckets) {
if (sumDocCountError == -1) {
b.docCountError = -1;
bucket.docCountError = -1;
} else {
b.docCountError += sumDocCountError;
bucket.docCountError += sumDocCountError;
}
if (b.docCount >= minDocCount) {
B removed = ordered.insertWithOverflow(b);
if (bucket.docCount >= minDocCount) {
B removed = ordered.insertWithOverflow(bucket);
if (removed != null) {
otherDocCount += removed.getDocCount();
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
@ -273,7 +398,7 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
reduceContext.consumeBucketsAndMaybeBreak(1);
}
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
}
}
list = createBucketsArray(ordered.size());
@ -281,19 +406,18 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
list[i] = ordered.pop();
}
} else {
// keep all buckets on partial reduce
// TODO: we could prune the buckets when sorting by key
list = createBucketsArray(buckets.size());
int pos = 0;
for (List<B> sameTermBuckets : buckets.values()) {
final B b = reduceBucket(sameTermBuckets, reduceContext);
// we can prune the list on partial reduce if the aggregation is ordered by key
// and not filtered (minDocCount == 0)
int size = isKeyOrder(order) && minDocCount == 0 ? Math.min(requiredSize, reducedBuckets.size()) : reducedBuckets.size();
list = createBucketsArray(size);
for (int i = 0; i < size; i++) {
reduceContext.consumeBucketsAndMaybeBreak(1);
list[i] = reducedBuckets.get(i);
if (sumDocCountError == -1) {
b.docCountError = -1;
list[i].docCountError = -1;
} else {
b.docCountError += sumDocCountError;
list[i].docCountError += sumDocCountError;
}
list[pos++] = b;
}
}
long docCountError;
@ -302,7 +426,7 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
} else {
docCountError = aggregations.size() == 1 ? 0 : sumDocCountError;
}
return create(name, Arrays.asList(list), docCountError, otherDocCount);
return create(name, Arrays.asList(list), thisReduceOrder, docCountError, otherDocCount);
}
@Override
@ -334,7 +458,7 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
protected abstract int getShardSize();
protected abstract A create(String name, List<B> buckets, long docCountError, long otherDocCount);
protected abstract A create(String name, List<B> buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount);
/**
* Create an array to hold some buckets. Used in collecting the results.
@ -351,13 +475,14 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
InternalTerms<?,?> that = (InternalTerms<?,?>) obj;
return Objects.equals(minDocCount, that.minDocCount)
&& Objects.equals(reduceOrder, that.reduceOrder)
&& Objects.equals(order, that.order)
&& Objects.equals(requiredSize, that.requiredSize);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), minDocCount, order, requiredSize);
return Objects.hash(super.hashCode(), minDocCount, reduceOrder, order, requiredSize);
}
protected static XContentBuilder doXContentCommon(XContentBuilder builder, Params params,

View File

@ -100,10 +100,10 @@ public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket>
}
}
public LongTerms(String name, BucketOrder order, int requiredSize, long minDocCount,
public LongTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
List<Bucket> buckets, long docCountError) {
super(name, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
otherDocCount, buckets, docCountError);
}
@ -121,7 +121,7 @@ public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket>
@Override
public LongTerms create(List<Bucket> buckets) {
return new LongTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize,
return new LongTerms(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
showTermDocCountError, otherDocCount, buckets, docCountError);
}
@ -132,8 +132,8 @@ public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket>
}
@Override
protected LongTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount) {
return new LongTerms(name, order, requiredSize, minDocCount, getMetadata(), format, shardSize,
protected LongTerms create(String name, List<Bucket> buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount) {
return new LongTerms(name, reduceOrder, order, requiredSize, minDocCount, getMetadata(), format, shardSize,
showTermDocCountError, otherDocCount, buckets, docCountError);
}
@ -168,7 +168,7 @@ public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket>
bucket.getDocCount(), (InternalAggregations) bucket.getAggregations(), longTerms.showTermDocCountError,
longTerms.showTermDocCountError ? bucket.getDocCountError() : 0, decimalFormat));
}
return new DoubleTerms(longTerms.getName(), longTerms.order, longTerms.requiredSize,
return new DoubleTerms(longTerms.getName(), longTerms.reduceOrder, longTerms.order, longTerms.requiredSize,
longTerms.minDocCount,
longTerms.metadata, longTerms.format, longTerms.shardSize,
longTerms.showTermDocCountError, longTerms.otherDocCount,

View File

@ -50,6 +50,8 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.LongConsumer;
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
/**
* An aggregator of string values that hashes the strings on the fly rather
* than up front like the {@link GlobalOrdinalsStringTermsAggregator}.
@ -226,7 +228,7 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
PriorityQueue<B> ordered = buildPriorityQueue(size);
B spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
@ -416,9 +418,16 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
@Override
StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bucket[] topBuckets) {
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount,
Arrays.asList(topBuckets), 0);
final BucketOrder reduceOrder;
if (isKeyOrder(order) == false) {
reduceOrder = InternalOrder.key(true);
Arrays.sort(topBuckets, reduceOrder.comparator());
} else {
reduceOrder = order;
}
return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
otherDocCount, Arrays.asList(topBuckets), 0);
}
@Override

View File

@ -48,12 +48,14 @@ import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
public class NumericTermsAggregator extends TermsAggregator {
private final ResultStrategy<?, ?> resultStrategy;
@ -189,7 +191,8 @@ public class NumericTermsAggregator extends TermsAggregator {
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]);
result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx],
topBucketsPerOrd[ordIdx]);
}
return result;
}
@ -363,8 +366,16 @@ public class NumericTermsAggregator extends TermsAggregator {
@Override
LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket[] topBuckets) {
final BucketOrder reduceOrder;
if (isKeyOrder(order) == false) {
reduceOrder = InternalOrder.key(true);
Arrays.sort(topBuckets, reduceOrder.comparator());
} else {
reduceOrder = order;
}
return new LongTerms(
name,
reduceOrder,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
@ -383,6 +394,7 @@ public class NumericTermsAggregator extends TermsAggregator {
return new LongTerms(
name,
order,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
@ -397,6 +409,7 @@ public class NumericTermsAggregator extends TermsAggregator {
}
class DoubleTermsResults extends StandardTermsResultStrategy<DoubleTerms, DoubleTerms.Bucket> {
DoubleTermsResults(boolean showTermDocCountError) {
super(showTermDocCountError);
}
@ -435,8 +448,16 @@ public class NumericTermsAggregator extends TermsAggregator {
@Override
DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bucket[] topBuckets) {
final BucketOrder reduceOrder;
if (isKeyOrder(order) == false) {
reduceOrder = InternalOrder.key(true);
Arrays.sort(topBuckets, reduceOrder.comparator());
} else {
reduceOrder = order;
}
return new DoubleTerms(
name,
reduceOrder,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
@ -455,6 +476,7 @@ public class NumericTermsAggregator extends TermsAggregator {
return new DoubleTerms(
name,
order,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),

View File

@ -103,10 +103,10 @@ public class StringTerms extends InternalMappedTerms<StringTerms, StringTerms.Bu
}
}
public StringTerms(String name, BucketOrder order, int requiredSize, long minDocCount,
public StringTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
List<Bucket> buckets, long docCountError) {
super(name, order, requiredSize, minDocCount, metadata, format,
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format,
shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
}
@ -124,7 +124,7 @@ public class StringTerms extends InternalMappedTerms<StringTerms, StringTerms.Bu
@Override
public StringTerms create(List<Bucket> buckets) {
return new StringTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize,
return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
showTermDocCountError, otherDocCount, buckets, docCountError);
}
@ -140,8 +140,8 @@ public class StringTerms extends InternalMappedTerms<StringTerms, StringTerms.Bu
}
@Override
protected StringTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount) {
return new StringTerms(name, order, requiredSize, minDocCount, getMetadata(), format, shardSize,
protected StringTerms create(String name, List<Bucket> buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount) {
return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount, getMetadata(), format, shardSize,
showTermDocCountError, otherDocCount, buckets, docCountError);
}

View File

@ -374,8 +374,9 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
* which directly linked to maxOrd, so we need to limit).
*/
return new GlobalOrdinalsStringTermsAggregator.LowCardinality(name, factories,
ordinalsValuesSource, order, format, bucketCountThresholds, context, parent, false,
subAggCollectMode, showTermDocCountError, metadata);
a -> a.new StandardTermsResults(),
ordinalsValuesSource, order, format, bucketCountThresholds, context, parent, false,
subAggCollectMode, showTermDocCountError, metadata);
}
final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format);

View File

@ -51,7 +51,7 @@ public class UnmappedTerms extends InternalTerms<UnmappedTerms, UnmappedTerms.Bu
}
public UnmappedTerms(String name, BucketOrder order, int requiredSize, long minDocCount, Map<String, Object> metadata) {
super(name, order, requiredSize, minDocCount, metadata);
super(name, order, order, requiredSize, minDocCount, metadata);
}
/**
@ -92,7 +92,7 @@ public class UnmappedTerms extends InternalTerms<UnmappedTerms, UnmappedTerms.Bu
}
@Override
protected UnmappedTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount) {
protected UnmappedTerms create(String name, List<Bucket> buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount) {
throw new UnsupportedOperationException("not supported for UnmappedTerms");
}

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.search.aggregations;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -37,7 +36,6 @@ import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import java.util.ArrayList;
@ -46,7 +44,6 @@ import java.util.List;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
public class InternalAggregationsTests extends ESTestCase {
@ -61,7 +58,7 @@ public class InternalAggregationsTests extends ESTestCase {
}
public void testNonFinalReduceTopLevelPipelineAggs() {
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true),
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), BucketOrder.key(true),
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
List<InternalAggregations> aggs = singletonList(InternalAggregations.from(Collections.singletonList(terms)));
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, maxBucketReduceContext().forPartialReduction());
@ -70,7 +67,7 @@ public class InternalAggregationsTests extends ESTestCase {
}
public void testFinalReduceTopLevelPipelineAggs() {
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true),
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), BucketOrder.key(true),
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(terms));
@ -132,30 +129,14 @@ public class InternalAggregationsTests extends ESTestCase {
writeToAndReadFrom(aggregations, 0);
}
public void testGetTopLevelPipelineAggregators() throws Exception {
PipelineAggregator.PipelineTree pipelineTree = randomPipelineTree();
InternalAggregations aggs = createTestInstance(pipelineTree);
assertThat(aggs.getTopLevelPipelineAggregators(), equalTo(pipelineTree.aggregators()));
}
private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException {
Version version = VersionUtils.randomVersion(random());
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version);
aggregations.writeTo(out);
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytesRef().bytes), registry)) {
in.setVersion(version);
InternalAggregations deserialized = InternalAggregations.readFrom(in);
assertEquals(aggregations.aggregations, deserialized.aggregations);
if (iteration < 2) {
/*
* Add the pipeline tree for bwc serialization just like we
* do when we merge the aggregation. Without that we can't
* properly serialize to older versions.
*/
InternalAggregations asThoughReduced = new InternalAggregations(
deserialized.copyResults(), aggregations.getPipelineTreeForBwcSerialization());
writeToAndReadFrom(asThoughReduced, iteration + 1);
writeToAndReadFrom(deserialized, iteration + 1);
}
}
}

View File

@ -141,7 +141,7 @@ public class InternalMultiBucketAggregationTests extends ESTestCase {
new BytesRef("foo".getBytes(StandardCharsets.UTF_8), 0, "foo".getBytes(StandardCharsets.UTF_8).length), 1,
internalStringAggs, false, 0, DocValueFormat.RAW));
InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), 1, 0,
InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), BucketOrder.count(false), 1, 0,
Collections.emptyMap(), DocValueFormat.RAW, 1, false, 0, stringBuckets, 0);
InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(termsAgg));
LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW);
@ -161,7 +161,7 @@ public class InternalMultiBucketAggregationTests extends ESTestCase {
new BytesRef("foo".getBytes(StandardCharsets.UTF_8), 0, "foo".getBytes(StandardCharsets.UTF_8).length), 1,
internalStringAggs, false, 0, DocValueFormat.RAW));
InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), 1, 0,
InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), BucketOrder.count(false), 1, 0,
Collections.emptyMap(), DocValueFormat.RAW, 1, false, 0, stringBuckets, 0);
InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(termsAgg));
LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -53,7 +54,9 @@ public class DoubleTermsTests extends InternalTermsTestCase {
int docCount = randomIntBetween(1, 100);
buckets.add(new DoubleTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format));
}
return new DoubleTerms(name, order, requiredSize, minDocCount,
BucketOrder reduceOrder = rarely() ? order : BucketOrder.key(true);
Collections.sort(buckets, reduceOrder.comparator());
return new DoubleTerms(name, reduceOrder, order, requiredSize, minDocCount,
metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
}
@ -115,7 +118,8 @@ public class DoubleTermsTests extends InternalTermsTestCase {
default:
throw new AssertionError("Illegal randomisation branch");
}
return new DoubleTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize,
Collections.sort(buckets, doubleTerms.reduceOrder.comparator());
return new DoubleTerms(name, doubleTerms.reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
showTermDocCountError, otherDocCount, buckets, docCountError);
} else {
String name = instance.getName();

View File

@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -53,7 +54,9 @@ public class LongTermsTests extends InternalTermsTestCase {
int docCount = randomIntBetween(1, 100);
buckets.add(new LongTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format));
}
return new LongTerms(name, order, requiredSize, minDocCount,
BucketOrder reduceOrder = rarely() ? order : BucketOrder.key(true);
Collections.sort(buckets, reduceOrder.comparator());
return new LongTerms(name, reduceOrder, order, requiredSize, minDocCount,
metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
}
@ -115,7 +118,8 @@ public class LongTermsTests extends InternalTermsTestCase {
default:
throw new AssertionError("Illegal randomisation branch");
}
return new LongTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize,
Collections.sort(buckets, longTerms.reduceOrder.comparator());
return new LongTerms(name, longTerms.reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
showTermDocCountError, otherDocCount, buckets, docCountError);
} else {
String name = instance.getName();

View File

@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -54,7 +55,9 @@ public class StringTermsTests extends InternalTermsTestCase {
int docCount = randomIntBetween(1, 100);
buckets.add(new StringTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format));
}
return new StringTerms(name, order, requiredSize, minDocCount,
BucketOrder reduceOrder = rarely() ? order : BucketOrder.key(true);
Collections.sort(buckets, reduceOrder.comparator());
return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount,
metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
}
@ -116,7 +119,8 @@ public class StringTermsTests extends InternalTermsTestCase {
default:
throw new AssertionError("Illegal randomisation branch");
}
return new StringTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize,
Collections.sort(buckets, stringTerms.reduceOrder.comparator());
return new StringTerms(name, stringTerms.reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
showTermDocCountError, otherDocCount, buckets, docCountError);
} else {
String name = instance.getName();

View File

@ -39,6 +39,7 @@ import org.apache.lucene.search.TotalHits;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.settings.Settings;
@ -76,6 +77,7 @@ import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
@ -270,7 +272,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildTopLevel();
Terms result = reduce(aggregator);
assertEquals(5, result.getBuckets().size());
assertEquals("", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
@ -335,11 +337,11 @@ public class TermsAggregatorTests extends AggregatorTestCase {
.size(12)
.order(BucketOrder.key(true));
Aggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
TermsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildTopLevel();
Terms result = reduce(aggregator);
assertEquals(10, result.getBuckets().size());
assertEquals("val000", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -374,7 +376,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildTopLevel();
result = reduce(aggregator);
assertEquals(5, result.getBuckets().size());
assertEquals("val001", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -398,7 +400,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildTopLevel();
result = reduce(aggregator);
assertEquals(8, result.getBuckets().size());
assertEquals("val002", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -427,7 +429,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildTopLevel();
result = reduce(aggregator);
assertEquals(2, result.getBuckets().size());
assertEquals("val010", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -444,7 +446,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildTopLevel();
result = reduce(aggregator);
assertEquals(2, result.getBuckets().size());
assertEquals("val000", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -462,7 +464,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildTopLevel();
result = reduce(aggregator);
assertEquals(2, result.getBuckets().size());
assertEquals("val000", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -513,11 +515,11 @@ public class TermsAggregatorTests extends AggregatorTestCase {
.includeExclude(new IncludeExclude(new long[]{0, 5}, null))
.field("long_field")
.order(BucketOrder.key(true));
Aggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
TermsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildTopLevel();
Terms result = reduce(aggregator);
assertEquals(2, result.getBuckets().size());
assertEquals(0L, result.getBuckets().get(0).getKey());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -534,7 +536,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildTopLevel();
result = reduce(aggregator);
assertEquals(4, result.getBuckets().size());
assertEquals(1L, result.getBuckets().get(0).getKey());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -557,7 +559,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildTopLevel();
result = reduce(aggregator);
assertEquals(2, result.getBuckets().size());
assertEquals(0.0, result.getBuckets().get(0).getKey());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -574,7 +576,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildTopLevel();
result = reduce(aggregator);
assertEquals(4, result.getBuckets().size());
assertEquals(1.0, result.getBuckets().get(0).getKey());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -740,7 +742,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildTopLevel();
Terms result = reduce(aggregator);
assertEquals(size, result.getBuckets().size());
for (int i = 0; i < size; i++) {
Map.Entry<T, Integer> expected = expectedBuckets.get(i);
@ -766,7 +768,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = ((Filter) aggregator.buildTopLevel()).getAggregations().get("_name2");
result = ((Filter) reduce(aggregator)).getAggregations().get("_name2");
int expectedFilteredCounts = 0;
for (Integer count : filteredCounts.values()) {
if (count > 0) {
@ -839,7 +841,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildTopLevel();
Terms result = reduce(aggregator);
assertEquals(size, result.getBuckets().size());
for (int i = 0; i < size; i++) {
Map.Entry<T, Long> expected = expectedBuckets.get(i);
@ -868,7 +870,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildTopLevel();
Terms result = reduce(aggregator);
assertEquals("_name", result.getName());
assertEquals(0, result.getBuckets().size());
@ -878,7 +880,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildTopLevel();
result = reduce(aggregator);
assertEquals("_name", result.getName());
assertEquals(0, result.getBuckets().size());
@ -888,7 +890,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
result = (Terms) aggregator.buildTopLevel();
result = reduce(aggregator);
assertEquals("_name", result.getName());
assertEquals(0, result.getBuckets().size());
}
@ -911,7 +913,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildTopLevel();
Terms result = reduce(aggregator);
assertEquals("_name", result.getName());
assertEquals(0, result.getBuckets().size());
assertFalse(AggregationInspectionHelper.hasValue((InternalTerms)result));
@ -947,7 +949,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildTopLevel();
Terms result = reduce(aggregator);
assertEquals("_name", result.getName());
assertEquals(1, result.getBuckets().size());
assertEquals(missingValues[i], result.getBuckets().get(0).getKey());
@ -1019,7 +1021,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildTopLevel();
Terms result = reduce(aggregator);
assertEquals("_name", result.getName());
assertEquals(1, result.getBuckets().size());
assertEquals("192.168.100.42", result.getBuckets().get(0).getKey());
@ -1067,7 +1069,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
Terms result = (Terms) aggregator.buildTopLevel();
Terms result = reduce(aggregator);
assertEquals(3, result.getBuckets().size());
assertEquals("a", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
@ -1464,4 +1466,18 @@ public class TermsAggregatorTests extends AggregatorTestCase {
return aggregator.buildTopLevel();
}
private <T extends InternalAggregation> T reduce(Aggregator agg) throws IOException {
// now do the final reduce
MultiBucketConsumerService.MultiBucketConsumer reduceBucketConsumer =
new MultiBucketConsumerService.MultiBucketConsumer(Integer.MAX_VALUE,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction(
agg.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, PipelineTree.EMPTY);
T topLevel = (T) agg.buildTopLevel();
T result = (T) topLevel.reduce(Collections.singletonList(topLevel), context);
doAssertReducedMultiBucketConsumer(result, reduceBucketConsumer);
return result;
}
}

View File

@ -39,7 +39,6 @@ import org.elasticsearch.search.aggregations.InternalAggregationsTests;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.suggest.SuggestTests;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import java.util.Base64;
@ -77,8 +76,7 @@ public class QuerySearchResultTests extends ESTestCase {
public void testSerialization() throws Exception {
QuerySearchResult querySearchResult = createTestInstance();
Version version = VersionUtils.randomVersion(random());
QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version);
QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new);
assertEquals(querySearchResult.getContextId(), deserialized.getContextId());
assertNull(deserialized.getSearchShardTarget());
assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f);

View File

@ -146,8 +146,8 @@ public class AsyncSearchTaskTests extends ESTestCase {
AsyncSearchTask task = createAsyncSearchTask();
task.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(),
SearchResponse.Clusters.EMPTY, false);
InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), 1, 1,
Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0)));
InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(new StringTerms("name", BucketOrder.key(true),
BucketOrder.key(true), 1, 1, Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0)));
//providing an empty named writeable registry will make the expansion fail, hence the delayed reduction will fail too
//causing an exception when executing getResponse as part of the completion listener callback
DelayableWriteable.Serialized<InternalAggregations> serializedAggs = DelayableWriteable.referencing(aggs)
@ -184,8 +184,8 @@ public class AsyncSearchTaskTests extends ESTestCase {
AsyncSearchTask task = createAsyncSearchTask();
task.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(),
SearchResponse.Clusters.EMPTY, false);
InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), 1, 1,
Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0)));
InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(new StringTerms("name", BucketOrder.key(true),
BucketOrder.key(true), 1, 1, Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0)));
//providing an empty named writeable registry will make the expansion fail, hence the delayed reduction will fail too
//causing an exception when executing getResponse as part of the completion listener callback
DelayableWriteable.Serialized<InternalAggregations> serializedAggs = DelayableWriteable.referencing(aggs)