re-work #60196, to not skip building change collectors as otherwise date histogram only pivots would run slow relates #60125
This commit is contained in:
parent
bbacad648a
commit
5eb04fb413
|
@ -345,7 +345,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|||
// create the function
|
||||
function = FunctionFactory.create(getConfig());
|
||||
|
||||
if (isContinuous() && function.supportsIncrementalBucketUpdate()) {
|
||||
if (isContinuous()) {
|
||||
changeCollector = function.buildChangeCollector(getConfig().getSyncConfig().getField());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.transform.transforms.pivot;
|
|||
|
||||
import org.apache.lucene.search.BooleanQuery;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Rounding;
|
||||
import org.elasticsearch.common.geo.GeoPoint;
|
||||
import org.elasticsearch.geometry.Rectangle;
|
||||
|
@ -286,14 +287,16 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|||
}
|
||||
}
|
||||
|
||||
public CompositeBucketsChangeCollector(CompositeAggregationBuilder compositeAggregation, Map<String, FieldCollector> fieldCollectors) {
|
||||
public CompositeBucketsChangeCollector(
|
||||
@Nullable CompositeAggregationBuilder compositeAggregation,
|
||||
Map<String, FieldCollector> fieldCollectors
|
||||
) {
|
||||
this.compositeAggregation = compositeAggregation;
|
||||
this.fieldCollectors = fieldCollectors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchSourceBuilder buildChangesQuery(SearchSourceBuilder sourceBuilder, Map<String, Object> position, int pageSize) {
|
||||
|
||||
sourceBuilder.size(0);
|
||||
for (FieldCollector fieldCollector : fieldCollectors.values()) {
|
||||
AggregationBuilder aggregationForField = fieldCollector.aggregateChanges();
|
||||
|
@ -304,9 +307,11 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|||
pageSize = Math.min(pageSize, fieldCollector.getMaxPageSize());
|
||||
}
|
||||
|
||||
CompositeAggregationBuilder changesAgg = this.compositeAggregation;
|
||||
changesAgg.size(pageSize).aggregateAfter(position);
|
||||
sourceBuilder.aggregation(changesAgg);
|
||||
if (compositeAggregation != null) {
|
||||
CompositeAggregationBuilder changesAgg = compositeAggregation;
|
||||
changesAgg.size(pageSize).aggregateAfter(position);
|
||||
sourceBuilder.aggregation(changesAgg);
|
||||
}
|
||||
|
||||
return sourceBuilder;
|
||||
}
|
||||
|
@ -364,7 +369,7 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|||
}
|
||||
|
||||
public static ChangeCollector buildChangeCollector(
|
||||
CompositeAggregationBuilder compositeAggregationBuilder,
|
||||
@Nullable CompositeAggregationBuilder compositeAggregationBuilder,
|
||||
Map<String, SingleGroupSource> groups,
|
||||
String synchronizationField
|
||||
) {
|
||||
|
|
|
@ -215,8 +215,15 @@ public class Pivot implements Function {
|
|||
|
||||
@Override
|
||||
public ChangeCollector buildChangeCollector(String synchronizationField) {
|
||||
CompositeAggregationBuilder aggregationBuilder = null;
|
||||
|
||||
// skip if none of the group_by's requires it
|
||||
if (supportsIncrementalBucketUpdate()) {
|
||||
aggregationBuilder = createCompositeAggregationSources(config, true);
|
||||
}
|
||||
|
||||
return CompositeBucketsChangeCollector.buildChangeCollector(
|
||||
createCompositeAggregationSources(config, true),
|
||||
aggregationBuilder,
|
||||
config.getGroupConfig().getGroups(),
|
||||
synchronizationField
|
||||
);
|
||||
|
|
Loading…
Reference in New Issue