[ML] Throw an error when a datafeed needs CCS but it is not enabled for the node (#46044) (#46096)

Though we allow CCS within datafeeds, users could prevent nodes from accessing remote clusters. This can cause mysterious errors and difficult to troubleshoot.

This commit adds a check to verify that `cluster.remote.connect` is enabled on the current node when a datafeed is configured with a remote index pattern.
This commit is contained in:
Benjamin Trent 2019-08-30 09:27:07 -05:00 committed by GitHub
parent db949847e5
commit d0c5573a51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 92 additions and 5 deletions

View File

@ -72,6 +72,11 @@ those same roles.
`indices`:: `indices`::
(Required, array) An array of index names. Wildcards are supported. For (Required, array) An array of index names. Wildcards are supported. For
example: `["it_ops_metrics", "server*"]`. example: `["it_ops_metrics", "server*"]`.
+
--
NOTE: If any indices are in remote clusters then `cluster.remote.connect` must
not be set to `false` on any ML node.
--
`job_id`:: `job_id`::
(Required, string) A numerical character string that uniquely identifies the (Required, string) A numerical character string that uniquely identifies the

View File

@ -49,6 +49,9 @@ public final class Messages {
public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL = public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL =
"Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]"; "Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]";
public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists"; public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";
public static final String DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH = "Datafeed [{0}] is configured with a remote index pattern(s) {1}" +
" but the current node [{2}] is not allowed to connect to remote clusters." +
" Please enable cluster.remote.connect for all machine learning nodes.";
public static final String DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT = "Data Frame Analytics config query is not parsable"; public static final String DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT = "Data Frame Analytics config query is not parsable";
public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected"; public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected";

View File

@ -546,7 +546,9 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
jobConfigProvider, jobConfigProvider,
jobResultsProvider, jobResultsProvider,
datafeedConfigProvider, datafeedConfigProvider,
jobResultsPersister); jobResultsPersister,
settings,
clusterService.getNodeName());
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, anomalyDetectionAuditor, autodetectProcessManager); System::currentTimeMillis, anomalyDetectionAuditor, autodetectProcessManager);
this.datafeedManager.set(datafeedManager); this.datafeedManager.set(datafeedManager);

View File

@ -36,6 +36,7 @@ import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.MlTasks;
@ -46,6 +47,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
@ -84,6 +86,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
private final AnomalyDetectionAuditor auditor; private final AnomalyDetectionAuditor auditor;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck; private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
private final NamedXContentRegistry xContentRegistry; private final NamedXContentRegistry xContentRegistry;
private final boolean remoteClusterSearchSupported;
@Inject @Inject
public TransportStartDatafeedAction(Settings settings, TransportService transportService, ThreadPool threadPool, public TransportStartDatafeedAction(Settings settings, TransportService transportService, ThreadPool threadPool,
@ -102,6 +105,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
this.auditor = auditor; this.auditor = auditor;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.xContentRegistry = xContentRegistry; this.xContentRegistry = xContentRegistry;
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
} }
static void validate(Job job, static void validate(Job job,
@ -180,7 +184,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
}; };
// Verify data extractor factory can be created, then start persistent task // Verify data extractor factory can be created, then start persistent task
Consumer<Job> createDataExtrator = job -> { Consumer<Job> createDataExtractor = job -> {
if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedIndices())) { if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedIndices())) {
final RemoteClusterLicenseChecker remoteClusterLicenseChecker = final RemoteClusterLicenseChecker remoteClusterLicenseChecker =
new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode); new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode);
@ -192,6 +196,13 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
response -> { response -> {
if (response.isSuccess() == false) { if (response.isSuccess() == false) {
listener.onFailure(createUnlicensedError(params.getDatafeedId(), response)); listener.onFailure(createUnlicensedError(params.getDatafeedId(), response));
} else if (remoteClusterSearchSupported == false) {
listener.onFailure(
ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
datafeedConfigHolder.get().getId(),
RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()),
clusterService.getNodeName())));
} else { } else {
createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener); createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener);
} }
@ -213,7 +224,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
Job job = jobBuilder.build(); Job job = jobBuilder.build();
validate(job, datafeedConfigHolder.get(), tasks, xContentRegistry); validate(job, datafeedConfigHolder.get(), tasks, xContentRegistry);
auditDeprecations(datafeedConfigHolder.get(), job, auditor, xContentRegistry); auditDeprecations(datafeedConfigHolder.get(), job, auditor, xContentRegistry);
createDataExtrator.accept(job); createDataExtractor.accept(job);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }

View File

@ -8,17 +8,22 @@ package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@ -30,6 +35,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -45,11 +51,13 @@ public class DatafeedJobBuilder {
private final JobResultsProvider jobResultsProvider; private final JobResultsProvider jobResultsProvider;
private final DatafeedConfigProvider datafeedConfigProvider; private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsPersister jobResultsPersister; private final JobResultsPersister jobResultsPersister;
private final boolean remoteClusterSearchSupported;
private final String nodeName;
public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor, public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor,
Supplier<Long> currentTimeSupplier, JobConfigProvider jobConfigProvider, Supplier<Long> currentTimeSupplier, JobConfigProvider jobConfigProvider,
JobResultsProvider jobResultsProvider, DatafeedConfigProvider datafeedConfigProvider, JobResultsProvider jobResultsProvider, DatafeedConfigProvider datafeedConfigProvider,
JobResultsPersister jobResultsPersister) { JobResultsPersister jobResultsPersister, Settings settings, String nodeName) {
this.client = client; this.client = client;
this.xContentRegistry = Objects.requireNonNull(xContentRegistry); this.xContentRegistry = Objects.requireNonNull(xContentRegistry);
this.auditor = Objects.requireNonNull(auditor); this.auditor = Objects.requireNonNull(auditor);
@ -58,6 +66,8 @@ public class DatafeedJobBuilder {
this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
this.datafeedConfigProvider = Objects.requireNonNull(datafeedConfigProvider); this.datafeedConfigProvider = Objects.requireNonNull(datafeedConfigProvider);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister); this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
this.nodeName = nodeName;
} }
void build(String datafeedId, ActionListener<DatafeedJob> listener) { void build(String datafeedId, ActionListener<DatafeedJob> listener) {
@ -168,6 +178,18 @@ public class DatafeedJobBuilder {
configBuilder -> { configBuilder -> {
try { try {
datafeedConfigHolder.set(configBuilder.build()); datafeedConfigHolder.set(configBuilder.build());
if (remoteClusterSearchSupported == false) {
List<String> remoteIndices = RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices());
if (remoteIndices.isEmpty() == false) {
listener.onFailure(
ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
configBuilder.getId(),
remoteIndices,
nodeName)));
return;
}
}
jobConfigProvider.getJob(datafeedConfigHolder.get().getJobId(), jobConfigListener); jobConfigProvider.getJob(datafeedConfigHolder.get().getJobId(), jobConfigListener);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);

View File

@ -13,10 +13,12 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.mock.orig.Mockito; import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
@ -92,7 +94,9 @@ public class DatafeedJobBuilderTests extends ESTestCase {
jobConfigProvider, jobConfigProvider,
jobResultsProvider, jobResultsProvider,
datafeedConfigProvider, datafeedConfigProvider,
jobResultsPersister); jobResultsPersister,
Settings.EMPTY,
"test_node");
} }
public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception { public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception {
@ -202,6 +206,46 @@ public class DatafeedJobBuilderTests extends ESTestCase {
verify(taskHandler).accept(error); verify(taskHandler).accept(error);
} }
public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception {
Settings settings = Settings.builder().put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), false).build();
datafeedJobBuilder =
new DatafeedJobBuilder(
client,
xContentRegistry(),
auditor,
System::currentTimeMillis,
jobConfigProvider,
jobResultsProvider,
datafeedConfigProvider,
jobResultsPersister,
settings,
"test_node");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
jobBuilder.setCreateTime(new Date());
DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId());
datafeed.setIndices(Collections.singletonList("remotecluster:index-*"));
AtomicBoolean wasHandlerCalled = new AtomicBoolean(false);
ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
datafeedJob -> fail("datafeed builder did not fail when remote index was given and remote clusters were not enabled"),
e -> {
assertThat(e.getMessage(), equalTo(Messages.getMessage(Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
"datafeed1",
"[remotecluster:index-*]",
"test_node")));
wasHandlerCalled.compareAndSet(false, true);
}
);
givenJob(jobBuilder);
givenDatafeed(datafeed);
datafeedJobBuilder.build("datafeed1", datafeedJobHandler);
assertBusy(() -> wasHandlerCalled.get());
}
private void givenJob(Job.Builder job) { private void givenJob(Job.Builder job) {
Mockito.doAnswer(invocationOnMock -> { Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")