mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-28 02:48:38 +00:00
Skip sibling pipeline aggregators reduction during non-final reduce (#40101)
Today a coordinating node forces a final reduction of sibling pipeline aggregators whenever reducing aggs, unless it is reducing aggs incrementally. This works well for incremental reduction of aggs, but breaks CCS when minimizing roundtrips as each cluster ends up reducing its own pipeline aggregators locally while that should only be done by the CCS coordinating node later. This causes issues as after their reduction, pipeline aggs cannot be further reduced, which is what happens with CCS causing errors like "java.lang.UnsupportedOperationException: Not supported" being returned. Each coordinating node should rather honour the reduce context flag that indicates whether we are executing a final reduce or not. If not, it should leave the sibling pipeline aggregations alone. Note that his bug affects only pipeline aggs that don't have a parent in the aggs tree, while all the others work well. Relates to #40059 but does not fix it yet, as the CCS coordinating node also needs to be adapted to recreate sibling pipeline aggregators from the request.
This commit is contained in:
parent
83f12a3d9c
commit
803ec46331
server/src/main/java/org/elasticsearch
action/search
search/aggregations
@ -42,7 +42,6 @@ import org.elasticsearch.search.SearchPhaseResult;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.dfs.AggregatedDfs;
|
||||
import org.elasticsearch.search.dfs.DfsSearchResult;
|
||||
@ -65,8 +64,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.IntFunction;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
public final class SearchPhaseController {
|
||||
|
||||
@ -488,8 +485,8 @@ public final class SearchPhaseController {
|
||||
reducedCompletionSuggestions = reducedSuggest.filter(CompletionSuggestion.class);
|
||||
}
|
||||
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
|
||||
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
|
||||
firstResult.pipelineAggregators(), reduceContext);
|
||||
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
|
||||
InternalAggregations.reduce(aggregationsList, firstResult.pipelineAggregators(), reduceContext);
|
||||
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
|
||||
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
|
||||
reducedCompletionSuggestions);
|
||||
@ -499,32 +496,6 @@ public final class SearchPhaseController {
|
||||
firstResult.sortValueFormats(), numReducePhases, size, from, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs an intermediate reduce phase on the aggregations. For instance with this reduce phase never prune information
|
||||
* that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
|
||||
*/
|
||||
private InternalAggregations reduceAggsIncrementally(List<InternalAggregations> aggregationsList) {
|
||||
ReduceContext reduceContext = reduceContextFunction.apply(false);
|
||||
return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
|
||||
null, reduceContext);
|
||||
}
|
||||
|
||||
private static InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
|
||||
List<SiblingPipelineAggregator> pipelineAggregators, ReduceContext reduceContext) {
|
||||
InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
|
||||
if (pipelineAggregators != null) {
|
||||
List<InternalAggregation> newAggs = StreamSupport.stream(aggregations.spliterator(), false)
|
||||
.map((p) -> (InternalAggregation) p)
|
||||
.collect(Collectors.toList());
|
||||
for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
|
||||
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext);
|
||||
newAggs.add(newAgg);
|
||||
}
|
||||
return new InternalAggregations(newAggs);
|
||||
}
|
||||
return aggregations;
|
||||
}
|
||||
|
||||
public static final class ReducedQueryPhase {
|
||||
// the sum of all hits across all reduces shards
|
||||
final TotalHits totalHits;
|
||||
@ -644,7 +615,8 @@ public final class SearchPhaseController {
|
||||
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
|
||||
if (index == bufferSize) {
|
||||
if (hasAggs) {
|
||||
InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer));
|
||||
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
|
||||
InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext);
|
||||
Arrays.fill(aggsBuffer, null);
|
||||
aggsBuffer[0] = reducedAggs;
|
||||
}
|
||||
|
@ -315,8 +315,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
||||
if (localIndices != null) {
|
||||
ActionListener<SearchResponse> ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
|
||||
false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener);
|
||||
//here we provide the empty string a cluster alias, which means no prefix in index name,
|
||||
//but the coord node will perform non final reduce as it's not null.
|
||||
SearchRequest ccsLocalSearchRequest = SearchRequest.crossClusterSearch(searchRequest, localIndices.indices(),
|
||||
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
|
||||
localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener);
|
||||
|
@ -161,8 +161,7 @@ public class AggregatorFactories {
|
||||
}
|
||||
}
|
||||
|
||||
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory<?>[0],
|
||||
new ArrayList<PipelineAggregationBuilder>());
|
||||
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory<?>[0], new ArrayList<>());
|
||||
|
||||
private AggregatorFactory<?>[] factories;
|
||||
private List<PipelineAggregationBuilder> pipelineAggregatorFactories;
|
||||
|
@ -61,7 +61,7 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
|
||||
/**
|
||||
* Returns <code>true</code> iff the current reduce phase is the final reduce phase. This indicates if operations like
|
||||
* pipeline aggregations should be applied or if specific features like {@code minDocCount} should be taken into account.
|
||||
* Operations that are potentially loosing information can only be applied during the final reduce phase.
|
||||
* Operations that are potentially losing information can only be applied during the final reduce phase.
|
||||
*/
|
||||
public boolean isFinalReduce() {
|
||||
return isFinalReduce;
|
||||
|
@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@ -52,19 +53,26 @@ public final class InternalAggregations extends Aggregations implements Streamab
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new addAggregation.
|
||||
* Constructs a new aggregation.
|
||||
*/
|
||||
public InternalAggregations(List<InternalAggregation> aggregations) {
|
||||
super(aggregations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reduces the given lists of addAggregation.
|
||||
*
|
||||
* @param aggregationsList A list of aggregation to reduce
|
||||
* @return The reduced addAggregation
|
||||
* Reduces the given list of aggregations
|
||||
*/
|
||||
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
|
||||
return reduce(aggregationsList, null, context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reduces the given list of aggregations as well as the provided sibling pipeline aggregators.
|
||||
* Note that sibling pipeline aggregators are ignored when non final reduction is performed.
|
||||
*/
|
||||
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList,
|
||||
List<SiblingPipelineAggregator> siblingPipelineAggregators,
|
||||
ReduceContext context) {
|
||||
if (aggregationsList.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
@ -89,6 +97,15 @@ public final class InternalAggregations extends Aggregations implements Streamab
|
||||
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
|
||||
reducedAggregations.add(first.reduce(aggregations, context));
|
||||
}
|
||||
|
||||
if (siblingPipelineAggregators != null) {
|
||||
if (context.isFinalReduce()) {
|
||||
for (SiblingPipelineAggregator pipelineAggregator : siblingPipelineAggregators) {
|
||||
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context);
|
||||
reducedAggregations.add(newAgg);
|
||||
}
|
||||
}
|
||||
}
|
||||
return new InternalAggregations(reducedAggregations);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user