diff --git a/docs/reference/settings/ml-settings.asciidoc b/docs/reference/settings/ml-settings.asciidoc index 09fb8adad85..91afcbe5b34 100644 --- a/docs/reference/settings/ml-settings.asciidoc +++ b/docs/reference/settings/ml-settings.asciidoc @@ -109,3 +109,11 @@ cluster and the job is assigned to run on that node. IMPORTANT: This setting assumes some external process is capable of adding ML nodes to the cluster. This setting is only useful when used in conjunction with such an external process. + +`xpack.ml.process_connect_timeout` (<>):: +The connection timeout for {ml} processes that run separately from the {es} JVM. +Defaults to `10s`. Some {ml} processing is done by processes that run separately +to the {es} JVM. When such processes are started they must connect to the {es} +JVM. If such a process does not connect within the time period specified by this +setting then the process is assumed to have failed. Defaults to `10s`. The minimum +value for this setting is `5s`. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 69830e2573c..26bc333ad9b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -325,6 +325,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu public static final Setting MAX_OPEN_JOBS_PER_NODE = Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope); + public static final Setting PROCESS_CONNECT_TIMEOUT = + Setting.timeSetting("xpack.ml.process_connect_timeout", TimeValue.timeValueSeconds(10), + TimeValue.timeValueSeconds(5), Setting.Property.Dynamic, Setting.Property.NodeScope); + // Undocumented setting for integration test purposes public static final Setting MIN_DISK_SPACE_OFF_HEAP = Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope); @@ -362,6 +366,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu public List> getSettings() { return Collections.unmodifiableList( Arrays.asList(MachineLearningField.AUTODETECT_PROCESS, + PROCESS_CONNECT_TIMEOUT, ML_ENABLED, CONCURRENT_JOB_ALLOCATIONS, MachineLearningField.MAX_MODEL_MEMORY_LIMIT, @@ -479,8 +484,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu nativeController, client, clusterService); - normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController); - analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController); + normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService); + analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController, clusterService); } catch (IOException e) { // The low level cause of failure from the named pipe helper's perspective is almost never the real root cause, so // only log this at the lowest level of detail. It's almost always "file not found" on a named pipe we expect to be diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index 14743b93dc4..0429ca4fc63 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -7,10 +7,13 @@ package org.elasticsearch.xpack.ml.dataframe.process; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; @@ -28,14 +31,21 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { private static final Logger LOGGER = LogManager.getLogger(NativeAnalyticsProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); - public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); private final Environment env; private final NativeController nativeController; + private volatile Duration processConnectTimeout; - public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController) { + public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) { this.env = Objects.requireNonNull(env); this.nativeController = Objects.requireNonNull(nativeController); + setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings())); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, + this::setProcessConnectTimeout); + } + + void setProcessConnectTimeout(TimeValue processConnectTimeout) { + this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis()); } @Override @@ -74,7 +84,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { filesToDelete); try { analyticsBuilder.build(); - processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); + processPipes.connectStreams(processConnectTimeout); } catch (IOException e) { String msg = "Failed to launch data frame analytics process for job " + jobId; LOGGER.error(msg); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index ec0d834cd24..a683f856c8c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; @@ -37,13 +38,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); - public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); private final Client client; private final Environment env; private final Settings settings; private final NativeController nativeController; private final ClusterService clusterService; + private volatile Duration processConnectTimeout; public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client, ClusterService clusterService) { @@ -52,6 +53,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory this.nativeController = Objects.requireNonNull(nativeController); this.client = client; this.clusterService = clusterService; + setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(settings)); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, + this::setProcessConnectTimeout); + } + + void setProcessConnectTimeout(TimeValue processConnectTimeout) { + this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis()); } @Override @@ -88,8 +96,8 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory } } - private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes, - List filesToDelete) { + void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes, + List filesToDelete) { try { Settings updatedSettings = Settings.builder() @@ -109,7 +117,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory autodetectBuilder.quantiles(autodetectParams.quantiles()); } autodetectBuilder.build(); - processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); + processPipes.connectStreams(processConnectTimeout); } catch (IOException e) { String msg = "Failed to launch autodetect for job " + job.getId(); LOGGER.error(msg); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index 21f7229aef1..6900bcba117 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -7,10 +7,13 @@ package org.elasticsearch.xpack.ml.job.process.normalizer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; @@ -25,14 +28,21 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); - private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); private final Environment env; private final NativeController nativeController; + private volatile Duration processConnectTimeout; - public NativeNormalizerProcessFactory(Environment env, NativeController nativeController) { + public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) { this.env = Objects.requireNonNull(env); this.nativeController = Objects.requireNonNull(nativeController); + setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings())); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, + this::setProcessConnectTimeout); + } + + void setProcessConnectTimeout(TimeValue processConnectTimeout) { + this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis()); } @Override @@ -64,7 +74,7 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory List command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build(); processPipes.addArgs(command); nativeController.startProcess(command); - processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); + processPipes.connectStreams(processConnectTimeout); } catch (IOException e) { String msg = "Failed to launch normalizer for job " + jobId; LOGGER.error(msg); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactoryTests.java new file mode 100644 index 00000000000..095df55f71c --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactoryTests.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; +import org.elasticsearch.xpack.ml.process.NativeController; +import org.elasticsearch.xpack.ml.process.ProcessPipes; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class NativeAutodetectProcessFactoryTests extends ESTestCase { + + public void testSetProcessConnectTimeout() throws IOException { + + int timeoutSeconds = randomIntBetween(5, 100); + + Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .build(); + Environment env = TestEnvironment.newEnvironment(settings); + NativeController nativeController = mock(NativeController.class); + Client client = mock(Client.class); + ClusterSettings clusterSettings = new ClusterSettings(settings, + Sets.newHashSet(MachineLearning.PROCESS_CONNECT_TIMEOUT, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC)); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + Job job = mock(Job.class); + when(job.getId()).thenReturn("set_process_connect_test_job"); + AutodetectParams autodetectParams = mock(AutodetectParams.class); + ProcessPipes processPipes = mock(ProcessPipes.class); + + NativeAutodetectProcessFactory nativeAutodetectProcessFactory = + new NativeAutodetectProcessFactory(env, settings, nativeController, client, clusterService); + nativeAutodetectProcessFactory.setProcessConnectTimeout(TimeValue.timeValueSeconds(timeoutSeconds)); + nativeAutodetectProcessFactory.createNativeProcess(job, autodetectParams, processPipes, Collections.emptyList()); + + verify(processPipes, times(1)).connectStreams(eq(Duration.ofSeconds(timeoutSeconds))); + } +}