diff --git a/docs/reference/ml/anomaly-detection/apis/put-datafeed.asciidoc b/docs/reference/ml/anomaly-detection/apis/put-datafeed.asciidoc index 9e31b4b3e25..85fb6d23e32 100644 --- a/docs/reference/ml/anomaly-detection/apis/put-datafeed.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/put-datafeed.asciidoc @@ -72,6 +72,11 @@ those same roles. `indices`:: (Required, array) An array of index names. Wildcards are supported. For example: `["it_ops_metrics", "server*"]`. ++ +-- +NOTE: If any indices are in remote clusters then `cluster.remote.connect` must +not be set to `false` on any ML node. +-- `job_id`:: (Required, string) A numerical character string that uniquely identifies the diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index f5e66fed8a8..9014724bf5a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -49,6 +49,9 @@ public final class Messages { public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL = "Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]"; public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists"; + public static final String DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH = "Datafeed [{0}] is configured with a remote index pattern(s) {1}" + + " but the current node [{2}] is not allowed to connect to remote clusters." + + " Please enable cluster.remote.connect for all machine learning nodes."; public static final String DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT = "Data Frame Analytics config query is not parsable"; public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 3203bf9c51b..fe245fc4552 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -546,7 +546,9 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu jobConfigProvider, jobResultsProvider, datafeedConfigProvider, - jobResultsPersister); + jobResultsPersister, + settings, + clusterService.getNodeName()); DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, System::currentTimeMillis, anomalyDetectionAuditor, autodetectProcessManager); this.datafeedManager.set(datafeedManager); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 45e24419b05..1289f70cd27 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.ml.MlTasks; @@ -46,6 +47,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; @@ -84,6 +86,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction createDataExtrator = job -> { + Consumer createDataExtractor = job -> { if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedIndices())) { final RemoteClusterLicenseChecker remoteClusterLicenseChecker = new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode); @@ -192,6 +196,13 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction { if (response.isSuccess() == false) { listener.onFailure(createUnlicensedError(params.getDatafeedId(), response)); + } else if (remoteClusterSearchSupported == false) { + listener.onFailure( + ExceptionsHelper.badRequestException(Messages.getMessage( + Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH, + datafeedConfigHolder.get().getId(), + RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()), + clusterService.getNodeName()))); } else { createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener); } @@ -213,7 +224,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction currentTimeSupplier, JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider, DatafeedConfigProvider datafeedConfigProvider, - JobResultsPersister jobResultsPersister) { + JobResultsPersister jobResultsPersister, Settings settings, String nodeName) { this.client = client; this.xContentRegistry = Objects.requireNonNull(xContentRegistry); this.auditor = Objects.requireNonNull(auditor); @@ -58,6 +66,8 @@ public class DatafeedJobBuilder { this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); this.datafeedConfigProvider = Objects.requireNonNull(datafeedConfigProvider); this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister); + this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings); + this.nodeName = nodeName; } void build(String datafeedId, ActionListener listener) { @@ -168,6 +178,18 @@ public class DatafeedJobBuilder { configBuilder -> { try { datafeedConfigHolder.set(configBuilder.build()); + if (remoteClusterSearchSupported == false) { + List remoteIndices = RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()); + if (remoteIndices.isEmpty() == false) { + listener.onFailure( + ExceptionsHelper.badRequestException(Messages.getMessage( + Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH, + configBuilder.getId(), + remoteIndices, + nodeName))); + return; + } + } jobConfigProvider.getJob(datafeedConfigHolder.get().getJobId(), jobConfigListener); } catch (Exception e) { listener.onFailure(e); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java index 11335236bfb..ac8657575e2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java @@ -13,10 +13,12 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.mock.orig.Mockito; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -92,7 +94,9 @@ public class DatafeedJobBuilderTests extends ESTestCase { jobConfigProvider, jobResultsProvider, datafeedConfigProvider, - jobResultsPersister); + jobResultsPersister, + Settings.EMPTY, + "test_node"); } public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception { @@ -202,6 +206,46 @@ public class DatafeedJobBuilderTests extends ESTestCase { verify(taskHandler).accept(error); } + public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception { + Settings settings = Settings.builder().put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), false).build(); + datafeedJobBuilder = + new DatafeedJobBuilder( + client, + xContentRegistry(), + auditor, + System::currentTimeMillis, + jobConfigProvider, + jobResultsProvider, + datafeedConfigProvider, + jobResultsPersister, + settings, + "test_node"); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + jobBuilder.setCreateTime(new Date()); + DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId()); + datafeed.setIndices(Collections.singletonList("remotecluster:index-*")); + + AtomicBoolean wasHandlerCalled = new AtomicBoolean(false); + ActionListener datafeedJobHandler = ActionListener.wrap( + datafeedJob -> fail("datafeed builder did not fail when remote index was given and remote clusters were not enabled"), + e -> { + assertThat(e.getMessage(), equalTo(Messages.getMessage(Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH, + "datafeed1", + "[remotecluster:index-*]", + "test_node"))); + wasHandlerCalled.compareAndSet(false, true); + } + ); + + givenJob(jobBuilder); + givenDatafeed(datafeed); + datafeedJobBuilder.build("datafeed1", datafeedJobHandler); + assertBusy(() -> wasHandlerCalled.get()); + } + private void givenJob(Job.Builder job) { Mockito.doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked")