Decouple pipeline reductions from final agg reduction (#45796)

Historically only two things happened in the final reduction:
empty buckets were filled, and pipeline aggs were reduced (since it
was the final reduction, this was safe).  Usage of the final reduction
is growing however.  Auto-date-histo might need to perform
many reductions on final-reduce to merge down buckets, CCS
may need to side-step the final reduction if sending to a
different cluster, etc

Having pipelines generate their output in the final reduce was
convenient, but is becoming increasingly difficult to manage
as the rest of the agg framework advances.

This commit decouples pipeline aggs from the final reduction by
introducing a new "top level" reduce, which should be called
at the beginning of the reduce cycle (e.g. from the SearchPhaseController).
This will only reduce pipeline aggs on the final reduce after
the non-pipeline agg tree has been fully reduced.

By separating pipeline reduction into their own set of methods,
aggregations are free to use the final reduction for whatever
purpose without worrying about generating pipeline results
which are non-reducible
This commit is contained in:
Zachary Tong 2019-12-05 16:11:27 -05:00 committed by Zachary Tong
parent 721a8b3d9c
commit fec882a457
72 changed files with 223 additions and 122 deletions

View File

@ -233,7 +233,7 @@ public class InternalMatrixStats extends InternalAggregation implements MatrixSt
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
// merge stats across all shards
List<InternalAggregation> aggs = new ArrayList<>(aggregations);
aggs.removeIf(p -> ((InternalMatrixStats)p).stats == null);

View File

@ -487,7 +487,7 @@ public final class SearchPhaseController {
}
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
InternalAggregations.reduce(aggregationsList, reduceContext);
InternalAggregations.topLevelReduce(aggregationsList, reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
reducedCompletionSuggestions);
@ -625,7 +625,7 @@ public final class SearchPhaseController {
if (index == bufferSize) {
if (hasAggs) {
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext);
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
}

View File

@ -196,7 +196,7 @@ final class SearchResponseMerger {
SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, topDocsStats);
setSuggestShardIndex(shards, groupedSuggestions);
Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true));
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContextFunction.apply(true));
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
SearchProfileShardResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
//make failures ordering consistent between ordinary search and CCS by looking at the shard they come from

View File

@ -126,23 +126,25 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
return name;
}
/**
* Creates the output from all pipeline aggs that this aggregation is associated with. Should only
* be called after all aggregations have been fully reduced
*/
public InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
assert reduceContext.isFinalReduce();
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
reducedAggs = pipelineAggregator.reduce(reducedAggs, reduceContext);
}
return reducedAggs;
}
/**
* Reduces the given aggregations to a single one and returns it. In <b>most</b> cases, the assumption will be the all given
* aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,
* try reusing an existing instance (typically the first in the given list) to save on redundant object
* construction.
*/
public final InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
InternalAggregation aggResult = doReduce(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
aggResult = pipelineAggregator.reduce(aggResult, reduceContext);
}
}
return aggResult;
}
public abstract InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
/**
* Return true if this aggregation is mapped, and can lead a reduction. If this agg returns

View File

@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* An internal implementation of {@link Aggregations}.
@ -98,10 +99,47 @@ public final class InternalAggregations extends Aggregations implements Writeabl
return topLevelPipelineAggregators;
}
@SuppressWarnings("unchecked")
private List<InternalAggregation> getInternalAggregations() {
return (List<InternalAggregation>) aggregations;
}
/**
* Begin the reduction process. This should be the entry point for the "first" reduction, e.g. called by
* SearchPhaseController or anywhere else that wants to initiate a reduction. It _should not_ be called
* as an intermediate reduction step (e.g. in the middle of an aggregation tree).
*
* This method first reduces the aggregations, and if it is the final reduce, then reduce the pipeline
* aggregations (both embedded parent/sibling as well as top-level sibling pipelines)
*/
public static InternalAggregations topLevelReduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
InternalAggregations reduced = reduce(aggregationsList, context);
if (reduced == null) {
return null;
}
if (context.isFinalReduce()) {
List<InternalAggregation> reducedInternalAggs = reduced.getInternalAggregations();
reducedInternalAggs = reducedInternalAggs.stream()
.map(agg -> agg.reducePipelines(agg, context))
.collect(Collectors.toList());
List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
InternalAggregation newAgg
= pipelineAggregator.doReduce(new InternalAggregations(reducedInternalAggs), context);
reducedInternalAggs.add(newAgg);
}
return new InternalAggregations(reducedInternalAggs);
}
return reduced;
}
/**
* Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first
* {@link InternalAggregations} object found in the list.
* Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
* Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled
* separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)}
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
if (aggregationsList.isEmpty()) {
@ -130,13 +168,6 @@ public final class InternalAggregations extends Aggregations implements Writeabl
reducedAggregations.add(first.reduce(aggregations, context));
}
if (context.isFinalReduce()) {
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context);
reducedAggregations.add(newAgg);
}
return new InternalAggregations(reducedAggregations);
}
return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators);
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -73,7 +74,7 @@ public abstract class InternalMultiBucketAggregation<A extends InternalMultiBuck
protected abstract B reduceBucket(List<B> buckets, ReduceContext context);
@Override
public abstract List<? extends InternalBucket> getBuckets();
public abstract List<B> getBuckets();
@Override
public Object getProperty(List<String> path) {
@ -141,6 +142,30 @@ public abstract class InternalMultiBucketAggregation<A extends InternalMultiBuck
return size;
}
/**
* Unlike {@link InternalAggregation#reducePipelines(InternalAggregation, ReduceContext)}, a multi-bucket
* agg needs to first reduce the buckets (and their parent pipelines) before allowing sibling pipelines
* to materialize
*/
@Override
public final InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
assert reduceContext.isFinalReduce();
List<B> materializedBuckets = reducePipelineBuckets(reduceContext);
return super.reducePipelines(create(materializedBuckets), reduceContext);
}
private List<B> reducePipelineBuckets(ReduceContext reduceContext) {
List<B> reducedBuckets = new ArrayList<>();
for (B bucket : getBuckets()) {
List<InternalAggregation> aggs = new ArrayList<>();
for (Aggregation agg : bucket.getAggregations()) {
aggs.add(((InternalAggregation)agg).reducePipelines((InternalAggregation)agg, reduceContext));
}
reducedBuckets.add(createBucket(new InternalAggregations(aggs), bucket));
}
return reducedBuckets;
}
public abstract static class InternalBucket implements Bucket, Writeable {
public Object getProperty(String containingAggName, List<String> path) {

View File

@ -97,7 +97,7 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations);
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long docCount = 0L;
List<InternalAggregations> subAggregationsList = new ArrayList<>(aggregations.size());
for (InternalAggregation aggregation : aggregations) {

View File

@ -181,7 +181,7 @@ public class InternalAdjacencyMatrix
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
Map<String, List<InternalBucket>> bucketsMap = new HashMap<>();
for (InternalAggregation aggregation : aggregations) {
InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation;

View File

@ -156,7 +156,7 @@ public class InternalComposite
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size());
for (InternalAggregation agg : aggregations) {
InternalComposite sortedAgg = (InternalComposite) agg;

View File

@ -189,7 +189,7 @@ public class InternalFilters extends InternalMultiBucketAggregation<InternalFilt
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<List<InternalBucket>> bucketsList = null;
for (InternalAggregation aggregation : aggregations) {
InternalFilters filters = (InternalFilters) aggregation;

View File

@ -81,7 +81,7 @@ public abstract class InternalGeoGrid<B extends InternalGeoGridBucket>
}
@Override
public InternalGeoGrid doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalGeoGrid reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
LongObjectPagedHashMap<List<InternalGeoGridBucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) {
InternalGeoGrid grid = (InternalGeoGrid) aggregation;

View File

@ -498,7 +498,7 @@ public final class InternalAutoDateHistogram extends
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {

View File

@ -448,7 +448,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {
if (minDocCount == 0) {

View File

@ -421,7 +421,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {
if (minDocCount == 0) {

View File

@ -238,7 +238,7 @@ public final class InternalBinaryRange
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
reduceContext.consumeBucketsAndMaybeBreak(buckets.size());
long[] docCounts = new long[buckets.size()];
InternalAggregations[][] aggs = new InternalAggregations[buckets.size()][];

View File

@ -295,7 +295,7 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
@SuppressWarnings("unchecked")
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
reduceContext.consumeBucketsAndMaybeBreak(ranges.size());
List<B>[] rangeList = new List[ranges.size()];
for (int i = 0; i < rangeList.length; ++i) {

View File

@ -49,7 +49,7 @@ public class UnmappedSampler extends InternalSampler {
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
return new UnmappedSampler(name, pipelineAggregators(), metaData);
}

View File

@ -126,7 +126,7 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
}
if (spare == null) {
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
}
spare.bucketOrd = bucketOrd;
copy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);

View File

@ -105,6 +105,9 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
return subsetSize;
}
// TODO we should refactor to remove this, since buckets should be immutable after they are generated.
// This can lead to confusing bugs if the bucket is re-created (via createBucket() or similar) without
// the score
void updateScore(SignificanceHeuristic significanceHeuristic) {
score = significanceHeuristic.getScore(subsetDf, subsetSize, supersetDf, supersetSize);
}
@ -191,7 +194,7 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
public abstract List<B> getBuckets();
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long globalSubsetSize = 0;
long globalSupersetSize = 0;
// Compute the overall result set size and the corpus size using the

View File

@ -42,14 +42,9 @@ public class SignificantLongTerms extends InternalMappedSignificantTerms<Signifi
long term;
Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations,
DocValueFormat format) {
DocValueFormat format, double score) {
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
this.term = term;
}
Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations,
double score) {
this(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations, null);
this.score = score;
}
@ -134,7 +129,7 @@ public class SignificantLongTerms extends InternalMappedSignificantTerms<Signifi
@Override
public Bucket createBucket(InternalAggregations aggregations, SignificantLongTerms.Bucket prototype) {
return new Bucket(prototype.subsetDf, prototype.subsetSize, prototype.supersetDf, prototype.supersetSize, prototype.term,
aggregations, prototype.format);
aggregations, prototype.format, prototype.score);
}
@Override
@ -151,6 +146,6 @@ public class SignificantLongTerms extends InternalMappedSignificantTerms<Signifi
@Override
Bucket createBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize,
InternalAggregations aggregations, SignificantLongTerms.Bucket prototype) {
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, prototype.term, aggregations, format);
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, prototype.term, aggregations, format, prototype.score);
}
}

View File

@ -88,7 +88,7 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
continue;
}
if (spare == null) {
spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format);
spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format, 0);
}
spare.term = bucketOrds.get(i);
spare.subsetDf = docCount;

View File

@ -43,9 +43,10 @@ public class SignificantStringTerms extends InternalMappedSignificantTerms<Signi
BytesRef termBytes;
public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations,
DocValueFormat format) {
DocValueFormat format, double score) {
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
this.termBytes = term;
this.score = score;
}
/**
@ -69,12 +70,6 @@ public class SignificantStringTerms extends InternalMappedSignificantTerms<Signi
aggregations.writeTo(out);
}
public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize,
InternalAggregations aggregations, double score, DocValueFormat format) {
this(term, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
this.score = score;
}
@Override
public Number getKeyAsNumber() {
// this method is needed for scripted numeric aggregations
@ -139,7 +134,7 @@ public class SignificantStringTerms extends InternalMappedSignificantTerms<Signi
@Override
public Bucket createBucket(InternalAggregations aggregations, SignificantStringTerms.Bucket prototype) {
return new Bucket(prototype.termBytes, prototype.subsetDf, prototype.subsetSize, prototype.supersetDf, prototype.supersetSize,
aggregations, prototype.format);
aggregations, prototype.format, prototype.score);
}
@Override
@ -156,6 +151,6 @@ public class SignificantStringTerms extends InternalMappedSignificantTerms<Signi
@Override
Bucket createBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize,
InternalAggregations aggregations, SignificantStringTerms.Bucket prototype) {
return new Bucket(prototype.termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
return new Bucket(prototype.termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format, prototype.score);
}
}

View File

@ -91,7 +91,7 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
}
if (spare == null) {
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
}
bucketOrds.get(i, spare.termBytes);

View File

@ -201,7 +201,7 @@ public class SignificantTextAggregator extends BucketsAggregator {
}
if (spare == null) {
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
}
bucketOrds.get(i, spare.termBytes);

View File

@ -105,7 +105,7 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms<UnmappedS
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
return new UnmappedSignificantTerms(name, requiredSize, minDocCount, pipelineAggregators(), metaData);
}

View File

@ -144,7 +144,7 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
boolean promoteToDouble = false;
for (InternalAggregation agg : aggregations) {
if (agg instanceof LongTerms && ((LongTerms) agg).format == DocValueFormat.RAW) {
@ -157,7 +157,7 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
}
}
if (promoteToDouble == false) {
return super.doReduce(aggregations, reduceContext);
return super.reduce(aggregations, reduceContext);
}
List<InternalAggregation> newAggs = new ArrayList<>(aggregations.size());
for (InternalAggregation agg : aggregations) {
@ -168,7 +168,7 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
newAggs.add(agg);
}
}
return newAggs.get(0).doReduce(newAggs, reduceContext);
return newAggs.get(0).reduce(newAggs, reduceContext);
}
@Override

View File

@ -87,7 +87,7 @@ public abstract class InternalMappedRareTerms<A extends InternalRareTerms<A, B>,
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
Map<Object, List<B>> buckets = new HashMap<>();
InternalRareTerms<A, B> referenceTerms = null;
SetBackedScalingCuckooFilter filter = null;

View File

@ -154,7 +154,7 @@ public abstract class InternalRareTerms<A extends InternalRareTerms<A, B>, B ext
public abstract B getBucketByKey(String term);
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
throw new UnsupportedOperationException();
}

View File

@ -192,7 +192,7 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
public abstract B getBucketByKey(String term);
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
Map<Object, List<B>> buckets = new HashMap<>();
long sumDocCountError = 0;
long otherDocCount = 0;

View File

@ -144,13 +144,13 @@ public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket>
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
for (InternalAggregation agg : aggregations) {
if (agg instanceof DoubleTerms) {
return agg.doReduce(aggregations, reduceContext);
return agg.reduce(aggregations, reduceContext);
}
}
return super.doReduce(aggregations, reduceContext);
return super.reduce(aggregations, reduceContext);
}
@Override

View File

@ -93,7 +93,7 @@ public class UnmappedRareTerms extends InternalRareTerms<UnmappedRareTerms, Unma
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
return new UnmappedRareTerms(name, pipelineAggregators(), metaData);
}

View File

@ -99,7 +99,7 @@ public class UnmappedTerms extends InternalTerms<UnmappedTerms, UnmappedTerms.Bu
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
return new UnmappedTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), metaData);
}

View File

@ -103,7 +103,7 @@ abstract class AbstractInternalHDRPercentiles extends InternalNumericMetricsAggr
}
@Override
public AbstractInternalHDRPercentiles doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public AbstractInternalHDRPercentiles reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
DoubleHistogram merged = null;
for (InternalAggregation aggregation : aggregations) {
final AbstractInternalHDRPercentiles percentiles = (AbstractInternalHDRPercentiles) aggregation;

View File

@ -87,7 +87,7 @@ abstract class AbstractInternalTDigestPercentiles extends InternalNumericMetrics
}
@Override
public AbstractInternalTDigestPercentiles doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public AbstractInternalTDigestPercentiles reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
TDigestState merged = null;
for (InternalAggregation aggregation : aggregations) {
final AbstractInternalTDigestPercentiles percentiles = (AbstractInternalTDigestPercentiles) aggregation;

View File

@ -87,7 +87,7 @@ public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue i
}
@Override
public InternalAvg doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAvg reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
long count = 0;
// Compute the sum of double values with Kahan summation algorithm which is more

View File

@ -85,7 +85,7 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
InternalCardinality reduced = null;
for (InternalAggregation aggregation : aggregations) {
final InternalCardinality cardinality = (InternalCardinality) aggregation;

View File

@ -140,7 +140,7 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
}
@Override
public InternalExtendedStats doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalExtendedStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double sumOfSqrs = 0;
double compensationOfSqrs = 0;
for (InternalAggregation aggregation : aggregations) {
@ -158,7 +158,7 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
sumOfSqrs = newSumOfSqrs;
}
}
final InternalStats stats = super.doReduce(aggregations, reduceContext);
final InternalStats stats = super.reduce(aggregations, reduceContext);
return new InternalExtendedStats(name, stats.getCount(), stats.getSum(), stats.getMin(), stats.getMax(), sumOfSqrs, sigma,
format, pipelineAggregators(), getMetaData());
}

View File

@ -93,7 +93,7 @@ public class InternalGeoBounds extends InternalAggregation implements GeoBounds
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double top = Double.NEGATIVE_INFINITY;
double bottom = Double.POSITIVE_INFINITY;
double posLeft = Double.POSITIVE_INFINITY;

View File

@ -114,7 +114,7 @@ public class InternalGeoCentroid extends InternalAggregation implements GeoCentr
}
@Override
public InternalGeoCentroid doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalGeoCentroid reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double lonSum = Double.NaN;
double latSum = Double.NaN;
int totalCount = 0;

View File

@ -71,7 +71,7 @@ public class InternalMax extends InternalNumericMetricsAggregation.SingleValue i
}
@Override
public InternalMax doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalMax reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double max = Double.NEGATIVE_INFINITY;
for (InternalAggregation aggregation : aggregations) {
max = Math.max(max, ((InternalMax) aggregation).max);

View File

@ -80,7 +80,7 @@ public class InternalMedianAbsoluteDeviation extends InternalNumericMetricsAggre
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
final TDigestState valueMerged = new TDigestState(valuesSketch.compression());
for (InternalAggregation aggregation : aggregations) {
final InternalMedianAbsoluteDeviation madAggregation = (InternalMedianAbsoluteDeviation) aggregation;

View File

@ -71,7 +71,7 @@ public class InternalMin extends InternalNumericMetricsAggregation.SingleValue i
}
@Override
public InternalMin doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalMin reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double min = Double.POSITIVE_INFINITY;
for (InternalAggregation aggregation : aggregations) {
min = Math.min(min, ((InternalMin) aggregation).min);

View File

@ -85,7 +85,7 @@ public class InternalScriptedMetric extends InternalAggregation implements Scrip
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Object> aggregationObjects = new ArrayList<>();
for (InternalAggregation aggregation : aggregations) {
InternalScriptedMetric mapReduceAggregation = (InternalScriptedMetric) aggregation;

View File

@ -145,7 +145,7 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
}
@Override
public InternalStats doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;

View File

@ -71,7 +71,7 @@ public class InternalSum extends InternalNumericMetricsAggregation.SingleValue i
}
@Override
public InternalSum doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalSum reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
CompensatedSum kahanSummation = new CompensatedSum(0, 0);

View File

@ -99,7 +99,7 @@ public class InternalTopHits extends InternalAggregation implements TopHits {
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
final SearchHits[] shardHits = new SearchHits[aggregations.size()];
final int from;
final int size;

View File

@ -70,7 +70,7 @@ public class InternalValueCount extends InternalNumericMetricsAggregation.Single
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long valueCount = 0;
for (InternalAggregation aggregation : aggregations) {
valueCount += ((InternalValueCount) aggregation).value;

View File

@ -87,7 +87,7 @@ public class InternalWeightedAvg extends InternalNumericMetricsAggregation.Singl
}
@Override
public InternalWeightedAvg doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalWeightedAvg reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
CompensatedSum sumCompensation = new CompensatedSum(0, 0);
CompensatedSum weightCompensation = new CompensatedSum(0, 0);

View File

@ -85,7 +85,7 @@ public class InternalBucketMetricValue extends InternalNumericMetricsAggregation
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}

View File

@ -48,7 +48,7 @@ public class InternalExtendedStatsBucket extends InternalExtendedStats implement
}
@Override
public InternalExtendedStats doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalExtendedStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}
}

View File

@ -126,7 +126,7 @@ public class InternalPercentilesBucket extends InternalNumericMetricsAggregation
}
@Override
public InternalMax doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalMax reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}

View File

@ -76,7 +76,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
}
@Override
public InternalSimpleValue doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalSimpleValue reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}

View File

@ -47,7 +47,7 @@ public class InternalStatsBucket extends InternalStats implements StatsBucket {
}
@Override
public InternalStats doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}
}

View File

@ -62,7 +62,7 @@ public class InternalAggregationsTests extends ESTestCase {
List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms),
topLevelPipelineAggs));
InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false);
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContext);
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContext);
assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size());
assertEquals(1, reducedAggs.aggregations.size());
}
@ -78,11 +78,11 @@ public class InternalAggregationsTests extends ESTestCase {
if (randomBoolean()) {
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms),
Collections.singletonList(siblingPipelineAggregator));
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs), reduceContext);
} else {
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms),
Collections.singletonList(siblingPipelineAggregator));
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs), reduceContext);
}
assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size());
assertEquals(2, reducedAggs.aggregations.size());

View File

@ -36,10 +36,15 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.aggregations.metrics.InternalStats;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import org.hamcrest.Matchers;
import org.junit.Assert;
@ -58,9 +63,12 @@ import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
private static final String DATE_FIELD = "date";
private static final String INSTANT_FIELD = "instant";
private static final String NUMERIC_FIELD = "numeric";
private static final List<ZonedDateTime> DATES_WITH_TIME = Arrays.asList(
ZonedDateTime.of(2010, 3, 12, 1, 7, 45, 0, ZoneOffset.UTC),
@ -718,6 +726,35 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
);
}
public void testWithPipelineReductions() throws IOException {
testSearchAndReduceCase(DEFAULT_QUERY, DATES_WITH_TIME,
aggregation -> aggregation.setNumBuckets(1).field(DATE_FIELD)
.subAggregation(AggregationBuilders.histogram("histo").field(NUMERIC_FIELD).interval(1)
.subAggregation(AggregationBuilders.max("max").field(NUMERIC_FIELD))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "max"))),
histogram -> {
assertTrue(AggregationInspectionHelper.hasValue(histogram));
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
assertEquals(1, buckets.size());
Histogram.Bucket bucket = buckets.get(0);
assertEquals("2010-01-01T00:00:00.000Z", bucket.getKeyAsString());
assertEquals(10, bucket.getDocCount());
assertThat(bucket.getAggregations().asList().size(), equalTo(1));
InternalHistogram histo = (InternalHistogram) bucket.getAggregations().asList().get(0);
assertThat(histo.getBuckets().size(), equalTo(10));
for (int i = 0; i < 10; i++) {
assertThat(histo.getBuckets().get(i).key, equalTo((double)i));
assertThat(((InternalMax)histo.getBuckets().get(i).aggregations.get("max")).getValue(), equalTo((double)i));
if (i > 0) {
assertThat(((InternalSimpleValue)histo.getBuckets().get(i).aggregations.get("deriv")).getValue(), equalTo(1.0));
}
}
});
}
private void testSearchCase(final Query query, final List<ZonedDateTime> dataset,
final Consumer<AutoDateHistogramAggregationBuilder> configure,
final Consumer<InternalAutoDateHistogram> verify) throws IOException {
@ -757,6 +794,7 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
final Document document = new Document();
int i = 0;
for (final ZonedDateTime date : dataset) {
if (frequently()) {
indexWriter.commit();
@ -765,8 +803,10 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
final long instant = date.toInstant().toEpochMilli();
document.add(new SortedNumericDocValuesField(DATE_FIELD, instant));
document.add(new LongPoint(INSTANT_FIELD, instant));
document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i));
indexWriter.addDocument(document);
document.clear();
i += 1;
}
}
@ -783,11 +823,19 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
fieldType.setHasDocValues(true);
fieldType.setName(aggregationBuilder.field());
MappedFieldType instantFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
instantFieldType.setName(INSTANT_FIELD);
instantFieldType.setHasDocValues(true);
MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
numericFieldType.setName(NUMERIC_FIELD);
numericFieldType.setHasDocValues(true);
final InternalAutoDateHistogram histogram;
if (reduced) {
histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType);
histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType, numericFieldType);
} else {
histogram = search(indexSearcher, query, aggregationBuilder, fieldType);
histogram = search(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType, numericFieldType);
}
verify.accept(histogram);
}

View File

@ -108,7 +108,6 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati
assertThat(result, equalTo(2));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/39497")
public void testReduceRandom() {
super.testReduceRandom();
}

View File

@ -109,7 +109,7 @@ public class InternalHistogramTests extends InternalMultiBucketAggregationTestCa
newBuckets.add(new InternalHistogram.Bucket(Double.NaN, b.docCount, keyed, b.format, b.aggregations));
InternalHistogram newHistogram = histogram.create(newBuckets);
newHistogram.doReduce(Arrays.asList(newHistogram, histogram2), new InternalAggregation.ReduceContext(null, null, false));
newHistogram.reduce(Arrays.asList(newHistogram, histogram2), new InternalAggregation.ReduceContext(null, null, false));
}
@Override

View File

@ -126,12 +126,12 @@ public class SignificanceHeuristicTests extends ESTestCase {
InternalMappedSignificantTerms<?, ?> getRandomSignificantTerms(SignificanceHeuristic heuristic) {
if (randomBoolean()) {
SignificantLongTerms.Bucket bucket = new SignificantLongTerms.Bucket(1, 2, 3, 4, 123, InternalAggregations.EMPTY,
DocValueFormat.RAW);
DocValueFormat.RAW, randomDoubleBetween(0, 100, true));
return new SignificantLongTerms("some_name", 1, 1, emptyList(), null, DocValueFormat.RAW, 10, 20, heuristic,
singletonList(bucket));
} else {
SignificantStringTerms.Bucket bucket = new SignificantStringTerms.Bucket(new BytesRef("someterm"), 1, 2, 3, 4,
InternalAggregations.EMPTY, DocValueFormat.RAW);
InternalAggregations.EMPTY, DocValueFormat.RAW, randomDoubleBetween(0, 100, true));
return new SignificantStringTerms("some_name", 1, 1, emptyList(), null, DocValueFormat.RAW, 10, 20, heuristic,
singletonList(bucket));
}
@ -149,7 +149,7 @@ public class SignificanceHeuristicTests extends ESTestCase {
public void testReduce() {
List<InternalAggregation> aggs = createInternalAggregations();
InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(null, null, true);
SignificantTerms reducedAgg = (SignificantTerms) aggs.get(0).doReduce(aggs, context);
SignificantTerms reducedAgg = (SignificantTerms) aggs.get(0).reduce(aggs, context);
assertThat(reducedAgg.getBuckets().size(), equalTo(2));
assertThat(reducedAgg.getBuckets().get(0).getSubsetDf(), equalTo(8L));
assertThat(reducedAgg.getBuckets().get(0).getSubsetSize(), equalTo(16L));
@ -196,7 +196,7 @@ public class SignificanceHeuristicTests extends ESTestCase {
@Override
SignificantStringTerms.Bucket createBucket(long subsetDF, long subsetSize, long supersetDF, long supersetSize, long label) {
return new SignificantStringTerms.Bucket(new BytesRef(Long.toString(label).getBytes(StandardCharsets.UTF_8)), subsetDF,
subsetSize, supersetDF, supersetSize, InternalAggregations.EMPTY, DocValueFormat.RAW);
subsetSize, supersetDF, supersetSize, InternalAggregations.EMPTY, DocValueFormat.RAW, 0);
}
}
private class LongTestAggFactory extends TestAggFactory<SignificantLongTerms, SignificantLongTerms.Bucket> {
@ -210,7 +210,7 @@ public class SignificanceHeuristicTests extends ESTestCase {
@Override
SignificantLongTerms.Bucket createBucket(long subsetDF, long subsetSize, long supersetDF, long supersetSize, long label) {
return new SignificantLongTerms.Bucket(subsetDF, subsetSize, supersetDF, supersetSize, label, InternalAggregations.EMPTY,
DocValueFormat.RAW);
DocValueFormat.RAW, 0);
}
}

View File

@ -58,7 +58,7 @@ public class SignificantLongTermsTests extends InternalSignificantTermsTestCase
for (int i = 0; i < numBuckets; ++i) {
long term = randomValueOtherThanMany(l -> terms.add(l) == false, random()::nextLong);
SignificantLongTerms.Bucket bucket = new SignificantLongTerms.Bucket(subsetDfs[i], subsetSize,
supersetDfs[i], supersetSize, term, aggs, format);
supersetDfs[i], supersetSize, term, aggs, format, 0);
bucket.updateScore(significanceHeuristic);
buckets.add(bucket);
}
@ -109,7 +109,7 @@ public class SignificantLongTermsTests extends InternalSignificantTermsTestCase
case 5:
buckets = new ArrayList<>(buckets);
buckets.add(new SignificantLongTerms.Bucket(randomLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), InternalAggregations.EMPTY, format));
randomNonNegativeLong(), randomNonNegativeLong(), InternalAggregations.EMPTY, format, 0));
break;
case 8:
if (metaData == null) {

View File

@ -51,7 +51,7 @@ public class SignificantStringTermsTests extends InternalSignificantTermsTestCas
for (int i = 0; i < numBuckets; ++i) {
BytesRef term = randomValueOtherThanMany(b -> terms.add(b) == false, () -> new BytesRef(randomAlphaOfLength(10)));
SignificantStringTerms.Bucket bucket = new SignificantStringTerms.Bucket(term, subsetDfs[i], subsetSize,
supersetDfs[i], supersetSize, aggs, format);
supersetDfs[i], supersetSize, aggs, format, 0);
bucket.updateScore(significanceHeuristic);
buckets.add(bucket);
}
@ -103,7 +103,7 @@ public class SignificantStringTermsTests extends InternalSignificantTermsTestCas
buckets = new ArrayList<>(buckets);
buckets.add(new SignificantStringTerms.Bucket(new BytesRef(randomAlphaOfLengthBetween(1, 10)),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
InternalAggregations.EMPTY, format));
InternalAggregations.EMPTY, format, 0));
break;
case 8:
if (metaData == null) {

View File

@ -1073,7 +1073,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
new InternalAggregation.ReduceContext(new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY),
new NoneCircuitBreakerService()), null, true);
for (InternalAggregation internalAgg : aggs) {
InternalAggregation mergedAggs = internalAgg.doReduce(aggs, ctx);
InternalAggregation mergedAggs = internalAgg.reduce(aggs, ctx);
assertTrue(mergedAggs instanceof DoubleTerms);
long expected = numLongs + numDoubles;
List<? extends Terms.Bucket> buckets = ((DoubleTerms) mergedAggs).getBuckets();

View File

@ -23,8 +23,6 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.ParsedAvg;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.test.InternalAggregationTestCase;
@ -95,7 +93,7 @@ public class InternalAvgTests extends InternalAggregationTestCase<InternalAvg> {
aggregations.add(new InternalAvg("dummy1", value, 1, null, null, null));
}
InternalAvg internalAvg = new InternalAvg("dummy2", 0, 0, null, null, null);
InternalAvg reduced = internalAvg.doReduce(aggregations, null);
InternalAvg reduced = internalAvg.reduce(aggregations, null);
assertEquals(expected, reduced.getValue(), delta);
}

View File

@ -225,7 +225,7 @@ public class InternalExtendedStatsTests extends InternalAggregationTestCase<Inte
aggregations.add(new InternalExtendedStats("dummy1", 1, 0.0, 0.0, 0.0, sumOfSqrs, sigma, null, null, null));
}
InternalExtendedStats stats = new InternalExtendedStats("dummy", 1, 0.0, 0.0, 0.0, 0.0, sigma, null, null, null);
InternalExtendedStats reduced = stats.doReduce(aggregations, null);
InternalExtendedStats reduced = stats.reduce(aggregations, null);
assertEquals(expectedSumOfSqrs, reduced.getSumOfSquares(), delta);
}
}

View File

@ -114,7 +114,7 @@ public class InternalStatsTests extends InternalAggregationTestCase<InternalStat
aggregations.add(new InternalStats("dummy1", 1, value, value, value, null, null, null));
}
InternalStats internalStats = new InternalStats("dummy2", 0, 0.0, 2.0, 0.0, null, null, null);
InternalStats reduced = internalStats.doReduce(aggregations, null);
InternalStats reduced = internalStats.reduce(aggregations, null);
assertEquals("dummy2", reduced.getName());
assertEquals(values.length, reduced.getCount());
assertEquals(expectedSum, reduced.getSum(), delta);

View File

@ -87,7 +87,7 @@ public class InternalSumTests extends InternalAggregationTestCase<InternalSum> {
aggregations.add(new InternalSum("dummy1", value, null, null, null));
}
InternalSum internalSum = new InternalSum("dummy", 0, null, null, null);
InternalSum reduced = internalSum.doReduce(aggregations, null);
InternalSum reduced = internalSum.reduce(aggregations, null);
assertEquals(expected, reduced.value(), delta);
}

View File

@ -77,7 +77,7 @@ public class AvgBucketAggregatorTests extends AggregatorTestCase {
* it is fixed.
*
* Note: we have this test inside of the `avg_bucket` package so that we can get access to the package-private
* `doReduce()` needed for testing this
* `reduce()` needed for testing this
*/
public void testSameAggNames() throws IOException {
Query query = new MatchAllDocsQuery();

View File

@ -385,7 +385,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
InternalAggregation.ReduceContext context =
new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(),
reduceBucketConsumer, false);
A reduced = (A) aggs.get(0).doReduce(toReduce, context);
A reduced = (A) aggs.get(0).reduce(toReduce, context);
doAssertReducedMultiBucketConsumer(reduced, reduceBucketConsumer);
aggs = new ArrayList<>(aggs.subList(r, toReduceSize));
aggs.add(reduced);
@ -396,7 +396,12 @@ public abstract class AggregatorTestCase extends ESTestCase {
new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, true);
@SuppressWarnings("unchecked")
A internalAgg = (A) aggs.get(0).doReduce(aggs, context);
A internalAgg = (A) aggs.get(0).reduce(aggs, context);
// materialize any parent pipelines
internalAgg = (A) internalAgg.reducePipelines(internalAgg, context);
// materialize any sibling pipelines at top level
if (internalAgg.pipelineAggregators().size() > 0) {
for (PipelineAggregator pipelineAggregator : internalAgg.pipelineAggregators()) {
internalAgg = (A) pipelineAggregator.reduce(internalAgg, context);

View File

@ -64,7 +64,7 @@ public class InternalSimpleLongValue extends InternalNumericMetricsAggregation.S
}
@Override
public InternalSimpleLongValue doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalSimpleLongValue reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}

View File

@ -182,7 +182,7 @@ public class InternalStringStats extends InternalAggregation {
}
@Override
public InternalStringStats doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalStringStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long count = 0;
long totalLength = 0;
int minLength = Integer.MAX_VALUE;

View File

@ -610,7 +610,7 @@ public class RollupResponseTranslationTests extends AggregatorTestCase {
ScriptService scriptService = mock(ScriptService.class);
InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, scriptService, true);
InternalAggregation reduced = ((InternalDateHistogram)unrolled).doReduce(Collections.singletonList(unrolled), context);
InternalAggregation reduced = ((InternalDateHistogram)unrolled).reduce(Collections.singletonList(unrolled), context);
assertThat(reduced.toString(), equalTo("{\"histo\":{\"buckets\":[{\"key_as_string\":\"1970-01-01T00:00:00.100Z\",\"key\":100," +
"\"doc_count\":1},{\"key_as_string\":\"1970-01-01T00:00:00.200Z\",\"key\":200,\"doc_count\":1}," +
"{\"key_as_string\":\"1970-01-01T00:00:00.300Z\",\"key\":300,\"doc_count\":0,\"histo._count\":{\"value\":0.0}}," +

View File

@ -42,7 +42,7 @@ class TestMultiValueAggregation extends InternalNumericMetricsAggregation.MultiV
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
throw new UnsupportedOperationException();
}

View File

@ -37,7 +37,7 @@ public class TestSingleValueAggregation extends InternalAggregation {
}
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
throw new UnsupportedOperationException();
}