diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java index 57de1e80906..78ad3297c34 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java @@ -13,7 +13,6 @@ import org.elasticsearch.xpack.ml.process.ProcessResultsParser; import java.io.IOException; import java.nio.file.Path; -import java.time.Duration; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -26,9 +25,9 @@ abstract class AbstractNativeAnalyticsProcess extends AbstractNativeProc protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser resultParser, String jobId, ProcessPipes processPipes, int numberOfFields, - List filesToDelete, Consumer onProcessCrash, Duration processConnectTimeout, + List filesToDelete, Consumer onProcessCrash, NamedXContentRegistry namedXContentRegistry) { - super(jobId, processPipes, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout); + super(jobId, processPipes, numberOfFields, filesToDelete, onProcessCrash); this.name = Objects.requireNonNull(name); this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java index 210dabda181..4c41b8f9956 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java @@ -21,7 +21,6 @@ import org.elasticsearch.xpack.ml.process.StateToProcessWriterHelper; import java.io.IOException; import java.io.OutputStream; import java.nio.file.Path; -import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.function.Consumer; @@ -36,10 +35,10 @@ public class NativeAnalyticsProcess extends AbstractNativeAnalyticsProcess filesToDelete, - Consumer onProcessCrash, Duration processConnectTimeout, AnalyticsProcessConfig config, + Consumer onProcessCrash, AnalyticsProcessConfig config, NamedXContentRegistry namedXContentRegistry) { super(NAME, AnalyticsResult.PARSER, jobId, processPipes, numberOfFields, - filesToDelete, onProcessCrash, processConnectTimeout, namedXContentRegistry); + filesToDelete, onProcessCrash, namedXContentRegistry); this.config = Objects.requireNonNull(config); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index cb419033ea3..dc9c90f7324 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -72,7 +72,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory onProcessCrash) { String jobId = config.getId(); List filesToDelete = new ArrayList<>(); - ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId, + ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, jobId, false, true, true, hasState, config.getAnalysis().persistsState()); // The extra 2 are for the checksum and the control field @@ -81,7 +81,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory filesToDelete, - Consumer onProcessCrash, Duration processConnectTimeout) { + Consumer onProcessCrash) { super(NAME, MemoryUsageEstimationResult.PARSER, jobId, processPipes, - numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, NamedXContentRegistry.EMPTY); + numberOfFields, filesToDelete, onProcessCrash, NamedXContentRegistry.EMPTY); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java index f6a28c65bf6..8c89cde3881 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java @@ -67,7 +67,7 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce // memory estimation process pipe names are unique. Therefore an increasing counter value is appended to the config ID // to ensure uniqueness between calls. ProcessPipes processPipes = new ProcessPipes( - env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, config.getId() + "_" + counter.incrementAndGet(), + env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId() + "_" + counter.incrementAndGet(), false, false, true, false, false); createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes); @@ -77,8 +77,7 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce processPipes, 0, filesToDelete, - onProcessCrash, - processConnectTimeout); + onProcessCrash); try { process.start(executorService); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index be8f19007d4..e9f8d96fd5c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -27,7 +27,6 @@ import org.elasticsearch.xpack.ml.process.ProcessResultsParser; import java.io.IOException; import java.io.OutputStream; import java.nio.file.Path; -import java.time.Duration; import java.util.Iterator; import java.util.List; import java.util.function.Consumer; @@ -45,9 +44,8 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec NativeAutodetectProcess(String jobId, ProcessPipes processPipes, int numberOfFields, List filesToDelete, - ProcessResultsParser resultsParser, Consumer onProcessCrash, - Duration processConnectTimeout) { - super(jobId, processPipes, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout); + ProcessResultsParser resultsParser, Consumer onProcessCrash) { + super(jobId, processPipes, numberOfFields, filesToDelete, onProcessCrash); this.resultsParser = resultsParser; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 2e1cc69f37f..39628fc4c1a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -76,9 +76,9 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory ExecutorService executorService, Consumer onProcessCrash) { List filesToDelete = new ArrayList<>(); - ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AutodetectBuilder.AUTODETECT, job.getId(), - false, true, true, params.modelSnapshot() != null, - !AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING.get(settings)); + ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AutodetectBuilder.AUTODETECT, + job.getId(), false, true, true, params.modelSnapshot() != null, + AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING.get(settings) == false); createNativeProcess(job, params, processPipes, filesToDelete); boolean includeTokensField = MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA && job.getAnalysisConfig().getCategorizationFieldName() != null; @@ -90,7 +90,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory NamedXContentRegistry.EMPTY); NativeAutodetectProcess autodetect = new NativeAutodetectProcess( job.getId(), processPipes, numberOfFields, - filesToDelete, resultsParser, onProcessCrash, processConnectTimeout); + filesToDelete, resultsParser, onProcessCrash); try { autodetect.start(executorService, stateProcessor); return autodetect; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java index 559eacd7c35..8cf8c594e6e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java @@ -9,7 +9,6 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.output.NormalizerResult import org.elasticsearch.xpack.ml.process.AbstractNativeProcess; import org.elasticsearch.xpack.ml.process.ProcessPipes; -import java.time.Duration; import java.util.Collections; /** @@ -19,8 +18,8 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize private static final String NAME = "normalizer"; - NativeNormalizerProcess(String jobId, ProcessPipes processPipes, Duration processConnectTimeout) { - super(jobId, processPipes, 0, Collections.emptyList(), (ignore) -> {}, processConnectTimeout); + NativeNormalizerProcess(String jobId, ProcessPipes processPipes) { + super(jobId, processPipes, 0, Collections.emptyList(), (ignore) -> {}); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index b5bbe6c8326..15bfc03c1c9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -54,11 +54,11 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory // The job ID passed to the process pipes is only used to make the file names unique. Since normalize can get run many times // in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names // are unique. Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls. - ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, NormalizerBuilder.NORMALIZE, + ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, NormalizerBuilder.NORMALIZE, jobId + "_" + counter.incrementAndGet(), false, true, true, false, false); createNativeProcess(jobId, quantilesState, processPipes, bucketSpan); - NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes, processConnectTimeout); + NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes); try { normalizerProcess.start(executorService); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index 5df64442ba5..6d051c9c922 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -57,7 +57,6 @@ public abstract class AbstractNativeProcess implements NativeProcess { private final int numberOfFields; private final List filesToDelete; private final Consumer onProcessCrash; - private final Duration processConnectTimeout; private volatile Future logTailFuture; private volatile Future stateProcessorFuture; private volatile boolean processCloseInitiated; @@ -65,15 +64,13 @@ public abstract class AbstractNativeProcess implements NativeProcess { private volatile boolean isReady; protected AbstractNativeProcess(String jobId, ProcessPipes processPipes, - int numberOfFields, List filesToDelete, Consumer onProcessCrash, - Duration processConnectTimeout) { + int numberOfFields, List filesToDelete, Consumer onProcessCrash) { this.jobId = jobId; this.processPipes = processPipes; this.startTime = ZonedDateTime.now(); this.numberOfFields = numberOfFields; this.filesToDelete = filesToDelete; this.onProcessCrash = Objects.requireNonNull(onProcessCrash); - this.processConnectTimeout = Objects.requireNonNull(processConnectTimeout); } public abstract String getName(); @@ -86,7 +83,7 @@ public abstract class AbstractNativeProcess implements NativeProcess { */ public void start(ExecutorService executorService) throws IOException { - processPipes.connectLogStream(processConnectTimeout); + processPipes.connectLogStream(); cppLogHandler.set(processPipes.getLogStreamHandler()); logTailFuture = executorService.submit(() -> { @@ -101,7 +98,7 @@ public abstract class AbstractNativeProcess implements NativeProcess { } }); - processPipes.connectOtherStreams(processConnectTimeout); + processPipes.connectOtherStreams(); if (processPipes.getProcessInStream().isPresent()) { processInStream.set(new BufferedOutputStream(processPipes.getProcessInStream().get())); this.recordWriter.set(new LengthEncodedWriter(processInStream.get())); @@ -220,7 +217,7 @@ public abstract class AbstractNativeProcess implements NativeProcess { try { // The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID. // Without the PID we cannot kill the process. - NativeControllerHolder.getNativeController().killProcess(cppLogHandler().getPid(processConnectTimeout)); + NativeControllerHolder.getNativeController().killProcess(cppLogHandler().getPid(processPipes.getTimeout())); // Wait for the process to die before closing processInStream as if the process // is still alive when processInStream is closed it may start persisting state diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java index d5e26dcff2a..4953d636edf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java @@ -57,11 +57,11 @@ public class NativeController { private final OutputStream commandStream; NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper) throws IOException { - ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER, null, + ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null, true, false, false, false, false); - processPipes.connectLogStream(CONTROLLER_CONNECT_TIMEOUT); + processPipes.connectLogStream(); tailLogsInThread(processPipes.getLogStreamHandler()); - processPipes.connectOtherStreams(CONTROLLER_CONNECT_TIMEOUT); + processPipes.connectOtherStreams(); this.localNodeName = localNodeName; this.cppLogHandler = processPipes.getLogStreamHandler(); this.commandStream = new BufferedOutputStream(processPipes.getCommandStream().get()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java index 95509b04481..6b385a1fbba 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java @@ -36,6 +36,7 @@ public class ProcessPipes { public static final String RESTORE_IS_PIPE_ARG = "--restoreIsPipe"; public static final String PERSIST_ARG = "--persist="; public static final String PERSIST_IS_PIPE_ARG = "--persistIsPipe"; + public static final String TIMEOUT_ARG = "--namedPipeConnectTimeout="; private final NamedPipeHelper namedPipeHelper; private final String jobId; @@ -50,6 +51,14 @@ public class ProcessPipes { private final String restorePipeName; private final String persistPipeName; + /** + * Needs to be long enough for the C++ process perform all startup tasks that precede creation of named + * pipes. There should not be very many of these, so a short timeout should be fine. However, at least + * five seconds is recommended due to the vagaries of process scheduling and the way VMs can completely + * stall for some hypervisor actions. + */ + private final Duration timeout; + private CppLogMessageHandler logStreamHandler; private OutputStream commandStream; private OutputStream processInStream; @@ -66,11 +75,12 @@ public class ProcessPipes { * @param jobId The job ID of the process to which pipes are to be opened, if the process is associated with a specific job. * May be null or empty for processes not associated with a specific job. */ - public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, String processName, String jobId, + public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, Duration timeout, String processName, String jobId, boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe, boolean wantRestorePipe, boolean wantPersistPipe) { this.namedPipeHelper = namedPipeHelper; this.jobId = jobId; + this.timeout = timeout; // The way the pipe names are formed MUST match what is done in the controller main() // function, as it does not get any command line arguments when started as a daemon. If @@ -116,6 +126,7 @@ public class ProcessPipes { command.add(PERSIST_ARG + persistPipeName); command.add(PERSIST_IS_PIPE_ARG); } + command.add(TIMEOUT_ARG + timeout.getSeconds()); } /** @@ -123,24 +134,16 @@ public class ProcessPipes { * started to read from itso that there is no risk of messages logged in between creation of the other pipes on the C++ * side from blocking due to filling up the named pipe's buffer, and hence deadlocking communications between that process * and this JVM. - * @param timeout Needs to be long enough for the C++ process perform all startup tasks that precede creation of named pipes. - * There should not be very many of these, so a short timeout should be fine. However, at least a couple of - * seconds is recommended due to the vagaries of process scheduling and the way VMs can completely stall for - * some hypervisor actions. */ - public void connectLogStream(Duration timeout) throws IOException { + public void connectLogStream() throws IOException { logStreamHandler = new CppLogMessageHandler(jobId, namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout)); } /** * Connect the other pipes created by the C++ process after the logging pipe has been connected. This must be called after * the corresponding C++ process has been started, and after {@link #connectLogStream}. - * @param timeout Needs to be long enough for the C++ process perform all startup tasks that precede creation of named pipes. - * There should not be very many of these, so a short timeout should be fine. However, at least a couple of - * seconds is recommended due to the vagaries of process scheduling and the way VMs can completely stall for - * some hypervisor actions. */ - public void connectOtherStreams(Duration timeout) throws IOException { + public void connectOtherStreams() throws IOException { assert logStreamHandler != null : "Must connect log stream before other streams"; if (logStreamHandler == null) { throw new NullPointerException("Must connect log stream before other streams"); @@ -255,4 +258,8 @@ public class ProcessPipes { } return Optional.of(persistStream); } + + public Duration getTimeout() { + return timeout; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 0d461e751ac..d07b666a956 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -40,10 +40,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class NativeAutodetectProcessTests extends ESTestCase { @@ -75,13 +72,14 @@ public class NativeAutodetectProcessTests extends ESTestCase { when(processPipes.getProcessOutStream()).thenReturn(Optional.of(outputStream)); when(processPipes.getRestoreStream()).thenReturn(Optional.of(restoreStream)); when(processPipes.getPersistStream()).thenReturn(Optional.of(persistStream)); + when(processPipes.getTimeout()).thenReturn(Duration.ofSeconds(randomIntBetween(5, 100))); } @SuppressWarnings("unchecked") public void testProcessStartTime() throws Exception { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", processPipes, NUMBER_FIELDS, null, - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { process.start(executorService, mock(IndexingStateProcessor.class)); ZonedDateTime startTime = process.getProcessStartTime(); @@ -99,7 +97,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { String[] record = {"r1", "r2", "r3", "r4", "r5"}; try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", processPipes, NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { process.start(executorService, mock(IndexingStateProcessor.class)); process.writeRecord(record); @@ -129,7 +127,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { public void testFlush() throws IOException { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", processPipes, NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { process.start(executorService, mock(IndexingStateProcessor.class)); FlushJobParams params = FlushJobParams.builder().build(); @@ -158,8 +156,7 @@ public class NativeAutodetectProcessTests extends ESTestCase { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", processPipes, NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), - Duration.ZERO)) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { process.start(executorService); process.consumeAndCloseOutputStream(); @@ -167,28 +164,11 @@ public class NativeAutodetectProcessTests extends ESTestCase { } } - @SuppressWarnings("unchecked") - public void testPipeConnectTimeout() throws IOException { - - int timeoutSeconds = randomIntBetween(5, 100); - - try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", - processPipes, NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), - Duration.ofSeconds(timeoutSeconds))) { - - process.start(executorService); - } - - verify(processPipes, times(1)).connectLogStream(eq(Duration.ofSeconds(timeoutSeconds))); - verify(processPipes, times(1)).connectOtherStreams(eq(Duration.ofSeconds(timeoutSeconds))); - } - @SuppressWarnings("unchecked") private void testWriteMessage(CheckedConsumer writeFunction, String expectedMessageCode) throws IOException { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", processPipes, NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { process.start(executorService, mock(IndexingStateProcessor.class)); writeFunction.accept(process); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java index a066d25fd3f..f6a3ccafe83 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java @@ -187,7 +187,7 @@ public class AbstractNativeProcessTests extends ESTestCase { private class TestNativeProcess extends AbstractNativeProcess { TestNativeProcess() { - super("foo", processPipes, 0, null, onProcessCrash, Duration.ZERO); + super("foo", processPipes, 0, null, onProcessCrash); } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java index 30a88442750..3d362c90eda 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java @@ -60,12 +60,13 @@ public class ProcessPipesTests extends ESTestCase { when(namedPipeHelper.openNamedPipeInputStream(contains("persist"), any(Duration.class))) .thenReturn(new ByteArrayInputStream(PERSIST_BYTES)); - ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job", - false, true, true, true, true); + int timeoutSeconds = randomIntBetween(5, 100); + ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(timeoutSeconds), AutodetectBuilder.AUTODETECT, + "my_job", false, true, true, true, true); List command = new ArrayList<>(); processPipes.addArgs(command); - assertEquals(9, command.size()); + assertEquals(10, command.size()); assertEquals(ProcessPipes.LOG_PIPE_ARG, command.get(0).substring(0, ProcessPipes.LOG_PIPE_ARG.length())); assertEquals(ProcessPipes.INPUT_ARG, command.get(1).substring(0, ProcessPipes.INPUT_ARG.length())); assertEquals(ProcessPipes.INPUT_IS_PIPE_ARG, command.get(2)); @@ -75,15 +76,16 @@ public class ProcessPipesTests extends ESTestCase { assertEquals(ProcessPipes.RESTORE_IS_PIPE_ARG, command.get(6)); assertEquals(ProcessPipes.PERSIST_ARG, command.get(7).substring(0, ProcessPipes.PERSIST_ARG.length())); assertEquals(ProcessPipes.PERSIST_IS_PIPE_ARG, command.get(8)); + assertEquals(ProcessPipes.TIMEOUT_ARG + timeoutSeconds, command.get(9)); - processPipes.connectLogStream(Duration.ofSeconds(2)); + processPipes.connectLogStream(); CppLogMessageHandler logMessageHandler = processPipes.getLogStreamHandler(); assertNotNull(logMessageHandler); logMessageHandler.tailStream(); assertEquals(42, logMessageHandler.getPid(Duration.ZERO)); - processPipes.connectOtherStreams(Duration.ofSeconds(2)); + processPipes.connectOtherStreams(); assertFalse(processPipes.getCommandStream().isPresent()); assertTrue(processPipes.getProcessInStream().isPresent()); @@ -108,7 +110,7 @@ public class ProcessPipesTests extends ESTestCase { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); Environment env = TestEnvironment.newEnvironment(settings); - new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job", + new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job", true, true, true, true, true); } @@ -135,11 +137,11 @@ public class ProcessPipesTests extends ESTestCase { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); Environment env = TestEnvironment.newEnvironment(settings); - ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job", + ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job", true, true, true, true, true); - processPipes.connectLogStream(Duration.ofSeconds(2)); - expectThrows(IOException.class, () -> processPipes.connectOtherStreams(Duration.ofSeconds(2))); + processPipes.connectLogStream(); + expectThrows(IOException.class, processPipes::connectOtherStreams); // check the pipes successfully opened were then closed verify(logStream, times(1)).close();