[ML] Introduce a setting for the process connect timeout (#43234)

This change introduces a new setting,
xpack.ml.process_connect_timeout, to enable
the timeout for one of the external ML processes
to connect to the ES JVM to be increased.

The timeout may need to be increased if many
processes are being started simultaneously on
the same machine. This is unlikely in clusters
with many ML nodes, as we balance the processes
across the ML nodes, but can happen in clusters
with a single ML node and a high value for
xpack.ml.node_concurrent_job_allocations.
This commit is contained in:
David Roberts 2019-06-25 16:36:02 +01:00
parent 9493c145d7
commit 558e323c89
6 changed files with 114 additions and 12 deletions

View File

@ -109,3 +109,11 @@ cluster and the job is assigned to run on that node.
IMPORTANT: This setting assumes some external process is capable of adding ML nodes
to the cluster. This setting is only useful when used in conjunction with
such an external process.
`xpack.ml.process_connect_timeout` (<<cluster-update-settings,Dynamic>>)::
The connection timeout for {ml} processes that run separately from the {es} JVM.
Defaults to `10s`. Some {ml} processing is done by processes that run separately
to the {es} JVM. When such processes are started they must connect to the {es}
JVM. If such a process does not connect within the time period specified by this
setting then the process is assumed to have failed. Defaults to `10s`. The minimum
value for this setting is `5s`.

View File

@ -325,6 +325,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope);
public static final Setting<TimeValue> PROCESS_CONNECT_TIMEOUT =
Setting.timeSetting("xpack.ml.process_connect_timeout", TimeValue.timeValueSeconds(10),
TimeValue.timeValueSeconds(5), Setting.Property.Dynamic, Setting.Property.NodeScope);
// Undocumented setting for integration test purposes
public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP =
Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope);
@ -362,6 +366,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
public List<Setting<?>> getSettings() {
return Collections.unmodifiableList(
Arrays.asList(MachineLearningField.AUTODETECT_PROCESS,
PROCESS_CONNECT_TIMEOUT,
ML_ENABLED,
CONCURRENT_JOB_ALLOCATIONS,
MachineLearningField.MAX_MODEL_MEMORY_LIMIT,
@ -479,8 +484,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
nativeController,
client,
clusterService);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController);
analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService);
analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController, clusterService);
} catch (IOException e) {
// The low level cause of failure from the named pipe helper's perspective is almost never the real root cause, so
// only log this at the lowest level of detail. It's almost always "file not found" on a named pipe we expect to be

View File

@ -7,10 +7,13 @@ package org.elasticsearch.xpack.ml.dataframe.process;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
@ -28,14 +31,21 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory {
private static final Logger LOGGER = LogManager.getLogger(NativeAnalyticsProcessFactory.class);
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
private final Environment env;
private final NativeController nativeController;
private volatile Duration processConnectTimeout;
public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController) {
public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
this::setProcessConnectTimeout);
}
void setProcessConnectTimeout(TimeValue processConnectTimeout) {
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
}
@Override
@ -74,7 +84,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory {
filesToDelete);
try {
analyticsBuilder.build();
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
processPipes.connectStreams(processConnectTimeout);
} catch (IOException e) {
String msg = "Failed to launch data frame analytics process for job " + jobId;
LOGGER.error(msg);

View File

@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
@ -37,13 +38,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcessFactory.class);
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
private final Client client;
private final Environment env;
private final Settings settings;
private final NativeController nativeController;
private final ClusterService clusterService;
private volatile Duration processConnectTimeout;
public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client,
ClusterService clusterService) {
@ -52,6 +53,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
this.nativeController = Objects.requireNonNull(nativeController);
this.client = client;
this.clusterService = clusterService;
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
this::setProcessConnectTimeout);
}
void setProcessConnectTimeout(TimeValue processConnectTimeout) {
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
}
@Override
@ -88,7 +96,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
}
}
private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
List<Path> filesToDelete) {
try {
@ -109,7 +117,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
autodetectBuilder.quantiles(autodetectParams.quantiles());
}
autodetectBuilder.build();
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
processPipes.connectStreams(processConnectTimeout);
} catch (IOException e) {
String msg = "Failed to launch autodetect for job " + job.getId();
LOGGER.error(msg);

View File

@ -7,10 +7,13 @@ package org.elasticsearch.xpack.ml.job.process.normalizer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
@ -25,14 +28,21 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class);
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
private final Environment env;
private final NativeController nativeController;
private volatile Duration processConnectTimeout;
public NativeNormalizerProcessFactory(Environment env, NativeController nativeController) {
public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
this::setProcessConnectTimeout);
}
void setProcessConnectTimeout(TimeValue processConnectTimeout) {
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
}
@Override
@ -64,7 +74,7 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
List<String> command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build();
processPipes.addArgs(command);
nativeController.startProcess(command);
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
processPipes.connectStreams(processConnectTimeout);
} catch (IOException e) {
String msg = "Failed to launch normalizer for job " + jobId;
LOGGER.error(msg);

View File

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class NativeAutodetectProcessFactoryTests extends ESTestCase {
public void testSetProcessConnectTimeout() throws IOException {
int timeoutSeconds = randomIntBetween(5, 100);
Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build();
Environment env = TestEnvironment.newEnvironment(settings);
NativeController nativeController = mock(NativeController.class);
Client client = mock(Client.class);
ClusterSettings clusterSettings = new ClusterSettings(settings,
Sets.newHashSet(MachineLearning.PROCESS_CONNECT_TIMEOUT, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC));
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
Job job = mock(Job.class);
when(job.getId()).thenReturn("set_process_connect_test_job");
AutodetectParams autodetectParams = mock(AutodetectParams.class);
ProcessPipes processPipes = mock(ProcessPipes.class);
NativeAutodetectProcessFactory nativeAutodetectProcessFactory =
new NativeAutodetectProcessFactory(env, settings, nativeController, client, clusterService);
nativeAutodetectProcessFactory.setProcessConnectTimeout(TimeValue.timeValueSeconds(timeoutSeconds));
nativeAutodetectProcessFactory.createNativeProcess(job, autodetectParams, processPipes, Collections.emptyList());
verify(processPipes, times(1)).connectStreams(eq(Duration.ofSeconds(timeoutSeconds)));
}
}