diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 993fe548cc9..004fd51398a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -133,12 +134,16 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction listener = ActionListener.wrap( + dataExtractorFactory -> { + assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class)); + assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future."); + }, + e -> fail() + ); + DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener); + + // Test with remote index, aggregation, and chunking + datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); + listener = ActionListener.wrap( + dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)), + e -> fail() + ); + DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener); + + // Test with remote index, no aggregation, and no chunking + datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); + datafeedConfig.setIndices(Collections.singletonList("cluster_two:my_index")); + datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); + + listener = ActionListener.wrap( + dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class)), + e -> fail() + ); + + DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener); + + // Test with remote index, no aggregation, and chunking + datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); + listener = ActionListener.wrap( + dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)), + e -> fail() + ); + DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener); + } + public void testCreateDataExtractorFactoryGivenRollupAndValidAggregationAndAutoChunk() { givenAggregatableRollup("myField", "max", 5, "termField"); DataDescription.Builder dataDescription = new DataDescription.Builder();