diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 6456564a8e1..64b083328ba 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -345,7 +345,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer fieldCollectors) { + public CompositeBucketsChangeCollector( + @Nullable CompositeAggregationBuilder compositeAggregation, + Map fieldCollectors + ) { this.compositeAggregation = compositeAggregation; this.fieldCollectors = fieldCollectors; } @Override public SearchSourceBuilder buildChangesQuery(SearchSourceBuilder sourceBuilder, Map position, int pageSize) { - sourceBuilder.size(0); for (FieldCollector fieldCollector : fieldCollectors.values()) { AggregationBuilder aggregationForField = fieldCollector.aggregateChanges(); @@ -304,9 +307,11 @@ public class CompositeBucketsChangeCollector implements ChangeCollector { pageSize = Math.min(pageSize, fieldCollector.getMaxPageSize()); } - CompositeAggregationBuilder changesAgg = this.compositeAggregation; - changesAgg.size(pageSize).aggregateAfter(position); - sourceBuilder.aggregation(changesAgg); + if (compositeAggregation != null) { + CompositeAggregationBuilder changesAgg = compositeAggregation; + changesAgg.size(pageSize).aggregateAfter(position); + sourceBuilder.aggregation(changesAgg); + } return sourceBuilder; } @@ -364,7 +369,7 @@ public class CompositeBucketsChangeCollector implements ChangeCollector { } public static ChangeCollector buildChangeCollector( - CompositeAggregationBuilder compositeAggregationBuilder, + @Nullable CompositeAggregationBuilder compositeAggregationBuilder, Map groups, String synchronizationField ) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java index 54afac5ea0f..58c16fcd964 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java @@ -215,8 +215,15 @@ public class Pivot implements Function { @Override public ChangeCollector buildChangeCollector(String synchronizationField) { + CompositeAggregationBuilder aggregationBuilder = null; + + // skip if none of the group_by's requires it + if (supportsIncrementalBucketUpdate()) { + aggregationBuilder = createCompositeAggregationSources(config, true); + } + return CompositeBucketsChangeCollector.buildChangeCollector( - createCompositeAggregationSources(config, true), + aggregationBuilder, config.getGroupConfig().getGroups(), synchronizationField );