From 558e323c8983ac41ae53268831b7cb4d2cb53e68 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 25 Jun 2019 16:36:02 +0100 Subject: [PATCH] [ML] Introduce a setting for the process connect timeout (#43234) This change introduces a new setting, xpack.ml.process_connect_timeout, to enable the timeout for one of the external ML processes to connect to the ES JVM to be increased. The timeout may need to be increased if many processes are being started simultaneously on the same machine. This is unlikely in clusters with many ML nodes, as we balance the processes across the ML nodes, but can happen in clusters with a single ML node and a high value for xpack.ml.node_concurrent_job_allocations. --- docs/reference/settings/ml-settings.asciidoc | 8 +++ .../xpack/ml/MachineLearning.java | 9 ++- .../NativeAnalyticsProcessFactory.java | 16 ++++- .../NativeAutodetectProcessFactory.java | 16 +++-- .../NativeNormalizerProcessFactory.java | 16 ++++- .../NativeAutodetectProcessFactoryTests.java | 61 +++++++++++++++++++ 6 files changed, 114 insertions(+), 12 deletions(-) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactoryTests.java diff --git a/docs/reference/settings/ml-settings.asciidoc b/docs/reference/settings/ml-settings.asciidoc index 09fb8adad85..91afcbe5b34 100644 --- a/docs/reference/settings/ml-settings.asciidoc +++ b/docs/reference/settings/ml-settings.asciidoc @@ -109,3 +109,11 @@ cluster and the job is assigned to run on that node. IMPORTANT: This setting assumes some external process is capable of adding ML nodes to the cluster. This setting is only useful when used in conjunction with such an external process. + +`xpack.ml.process_connect_timeout` (<>):: +The connection timeout for {ml} processes that run separately from the {es} JVM. +Defaults to `10s`. Some {ml} processing is done by processes that run separately +to the {es} JVM. When such processes are started they must connect to the {es} +JVM. If such a process does not connect within the time period specified by this +setting then the process is assumed to have failed. Defaults to `10s`. The minimum +value for this setting is `5s`. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 69830e2573c..26bc333ad9b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -325,6 +325,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu public static final Setting MAX_OPEN_JOBS_PER_NODE = Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope); + public static final Setting PROCESS_CONNECT_TIMEOUT = + Setting.timeSetting("xpack.ml.process_connect_timeout", TimeValue.timeValueSeconds(10), + TimeValue.timeValueSeconds(5), Setting.Property.Dynamic, Setting.Property.NodeScope); + // Undocumented setting for integration test purposes public static final Setting MIN_DISK_SPACE_OFF_HEAP = Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope); @@ -362,6 +366,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu public List> getSettings() { return Collections.unmodifiableList( Arrays.asList(MachineLearningField.AUTODETECT_PROCESS, + PROCESS_CONNECT_TIMEOUT, ML_ENABLED, CONCURRENT_JOB_ALLOCATIONS, MachineLearningField.MAX_MODEL_MEMORY_LIMIT, @@ -479,8 +484,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu nativeController, client, clusterService); - normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController); - analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController); + normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService); + analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController, clusterService); } catch (IOException e) { // The low level cause of failure from the named pipe helper's perspective is almost never the real root cause, so // only log this at the lowest level of detail. It's almost always "file not found" on a named pipe we expect to be diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index 14743b93dc4..0429ca4fc63 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -7,10 +7,13 @@ package org.elasticsearch.xpack.ml.dataframe.process; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; @@ -28,14 +31,21 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { private static final Logger LOGGER = LogManager.getLogger(NativeAnalyticsProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); - public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); private final Environment env; private final NativeController nativeController; + private volatile Duration processConnectTimeout; - public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController) { + public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) { this.env = Objects.requireNonNull(env); this.nativeController = Objects.requireNonNull(nativeController); + setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings())); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, + this::setProcessConnectTimeout); + } + + void setProcessConnectTimeout(TimeValue processConnectTimeout) { + this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis()); } @Override @@ -74,7 +84,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { filesToDelete); try { analyticsBuilder.build(); - processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); + processPipes.connectStreams(processConnectTimeout); } catch (IOException e) { String msg = "Failed to launch data frame analytics process for job " + jobId; LOGGER.error(msg); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index ec0d834cd24..a683f856c8c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; @@ -37,13 +38,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); - public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); private final Client client; private final Environment env; private final Settings settings; private final NativeController nativeController; private final ClusterService clusterService; + private volatile Duration processConnectTimeout; public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client, ClusterService clusterService) { @@ -52,6 +53,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory this.nativeController = Objects.requireNonNull(nativeController); this.client = client; this.clusterService = clusterService; + setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(settings)); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, + this::setProcessConnectTimeout); + } + + void setProcessConnectTimeout(TimeValue processConnectTimeout) { + this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis()); } @Override @@ -88,8 +96,8 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory } } - private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes, - List filesToDelete) { + void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes, + List filesToDelete) { try { Settings updatedSettings = Settings.builder() @@ -109,7 +117,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory autodetectBuilder.quantiles(autodetectParams.quantiles()); } autodetectBuilder.build(); - processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); + processPipes.connectStreams(processConnectTimeout); } catch (IOException e) { String msg = "Failed to launch autodetect for job " + job.getId(); LOGGER.error(msg); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index 21f7229aef1..6900bcba117 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -7,10 +7,13 @@ package org.elasticsearch.xpack.ml.job.process.normalizer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; @@ -25,14 +28,21 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); - private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); private final Environment env; private final NativeController nativeController; + private volatile Duration processConnectTimeout; - public NativeNormalizerProcessFactory(Environment env, NativeController nativeController) { + public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) { this.env = Objects.requireNonNull(env); this.nativeController = Objects.requireNonNull(nativeController); + setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings())); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, + this::setProcessConnectTimeout); + } + + void setProcessConnectTimeout(TimeValue processConnectTimeout) { + this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis()); } @Override @@ -64,7 +74,7 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory List command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build(); processPipes.addArgs(command); nativeController.startProcess(command); - processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); + processPipes.connectStreams(processConnectTimeout); } catch (IOException e) { String msg = "Failed to launch normalizer for job " + jobId; LOGGER.error(msg); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactoryTests.java new file mode 100644 index 00000000000..095df55f71c --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactoryTests.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; +import org.elasticsearch.xpack.ml.process.NativeController; +import org.elasticsearch.xpack.ml.process.ProcessPipes; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class NativeAutodetectProcessFactoryTests extends ESTestCase { + + public void testSetProcessConnectTimeout() throws IOException { + + int timeoutSeconds = randomIntBetween(5, 100); + + Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .build(); + Environment env = TestEnvironment.newEnvironment(settings); + NativeController nativeController = mock(NativeController.class); + Client client = mock(Client.class); + ClusterSettings clusterSettings = new ClusterSettings(settings, + Sets.newHashSet(MachineLearning.PROCESS_CONNECT_TIMEOUT, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC)); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + Job job = mock(Job.class); + when(job.getId()).thenReturn("set_process_connect_test_job"); + AutodetectParams autodetectParams = mock(AutodetectParams.class); + ProcessPipes processPipes = mock(ProcessPipes.class); + + NativeAutodetectProcessFactory nativeAutodetectProcessFactory = + new NativeAutodetectProcessFactory(env, settings, nativeController, client, clusterService); + nativeAutodetectProcessFactory.setProcessConnectTimeout(TimeValue.timeValueSeconds(timeoutSeconds)); + nativeAutodetectProcessFactory.createNativeProcess(job, autodetectParams, processPipes, Collections.emptyList()); + + verify(processPipes, times(1)).connectStreams(eq(Duration.ofSeconds(timeoutSeconds))); + } +}