diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java index 09cb528e0f4..ade63afa51a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java @@ -565,7 +565,7 @@ public class CompositeBucketsChangeCollector implements ChangeCollector { entry.getKey(), entry.getValue().getMissingBucket(), ((DateHistogramGroupSource) entry.getValue()).getRounding(), - entry.getKey().equals(synchronizationField) + entry.getValue().getField().equals(synchronizationField) ) ); break; diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java index 2ee5a0553f1..a09cc2a4a75 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java @@ -15,12 +15,15 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.transform.transforms.pivot.DateHistogramGroupSource; import org.elasticsearch.xpack.core.transform.transforms.pivot.GeoTileGroupSourceTests; import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests; @@ -43,6 +46,7 @@ import java.util.Map.Entry; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -138,6 +142,60 @@ public class CompositeBucketsChangeCollectorTests extends ESTestCase { assertThat(((TermsQueryBuilder) queryBuilder).values(), containsInAnyOrder("id1", "id2", "id3")); } + public void testDateHistogramFieldCollector() throws IOException { + Map groups = new LinkedHashMap<>(); + + SingleGroupSource groupBy = new DateHistogramGroupSource( + "timestamp", + null, + false, + new DateHistogramGroupSource.FixedInterval(DateHistogramInterval.MINUTE), + null + ); + groups.put("output_timestamp", groupBy); + + ChangeCollector collector = CompositeBucketsChangeCollector.buildChangeCollector( + getCompositeAggregation(groups), + groups, + "timestamp" + ); + + QueryBuilder queryBuilder = collector.buildFilterQuery(66_666, 200_222); + assertNotNull(queryBuilder); + assertThat(queryBuilder, instanceOf(RangeQueryBuilder.class)); + // rounded down + assertThat(((RangeQueryBuilder) queryBuilder).from(), equalTo(Long.valueOf(60_000))); + assertTrue(((RangeQueryBuilder) queryBuilder).includeLower()); + assertThat(((RangeQueryBuilder) queryBuilder).fieldName(), equalTo("timestamp")); + + // timestamp field does not match + collector = CompositeBucketsChangeCollector.buildChangeCollector(getCompositeAggregation(groups), groups, "sync_timestamp"); + + queryBuilder = collector.buildFilterQuery(66_666, 200_222); + assertNull(queryBuilder); + + // field does not match, but output field equals sync field + collector = CompositeBucketsChangeCollector.buildChangeCollector(getCompositeAggregation(groups), groups, "output_timestamp"); + + queryBuilder = collector.buildFilterQuery(66_666, 200_222); + assertNull(queryBuilder); + + // missing bucket disables optimization + groupBy = new DateHistogramGroupSource( + "timestamp", + null, + true, + new DateHistogramGroupSource.FixedInterval(DateHistogramInterval.MINUTE), + null + ); + groups.put("output_timestamp", groupBy); + + collector = CompositeBucketsChangeCollector.buildChangeCollector(getCompositeAggregation(groups), groups, "timestamp"); + + queryBuilder = collector.buildFilterQuery(66_666, 200_222); + assertNull(queryBuilder); + } + private static CompositeAggregationBuilder getCompositeAggregation(Map groups) throws IOException { CompositeAggregationBuilder compositeAggregation; try (XContentBuilder builder = jsonBuilder()) {