diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java index 4d468f80176..74ba4f53997 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java @@ -127,23 +127,53 @@ public class ProcessPipes { public void connectStreams(Duration timeout) throws IOException { // The order here is important. It must match the order that the C++ process tries to connect to the pipes, otherwise // a timeout is guaranteed. Also change api::CIoManager in the C++ code if changing the order here. - if (logPipeName != null) { - logStream = namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout); + try { + if (logPipeName != null) { + logStream = namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout); + } + if (commandPipeName != null) { + commandStream = namedPipeHelper.openNamedPipeOutputStream(commandPipeName, timeout); + } + if (processInPipeName != null) { + processInStream = namedPipeHelper.openNamedPipeOutputStream(processInPipeName, timeout); + } + if (processOutPipeName != null) { + processOutStream = namedPipeHelper.openNamedPipeInputStream(processOutPipeName, timeout); + } + if (restorePipeName != null) { + restoreStream = namedPipeHelper.openNamedPipeOutputStream(restorePipeName, timeout); + } + if (persistPipeName != null) { + persistStream = namedPipeHelper.openNamedPipeInputStream(persistPipeName, timeout); + } + } catch (IOException ioe) { + try { + closeUnusedStreams(); + } catch (IOException suppressed) { + ioe.addSuppressed(new IOException("Error closing process pipes", suppressed)); + } + throw ioe; } - if (commandPipeName != null) { - commandStream = namedPipeHelper.openNamedPipeOutputStream(commandPipeName, timeout); + } + + private void closeUnusedStreams() throws IOException { + if (logStream != null) { + logStream.close(); } - if (processInPipeName != null) { - processInStream = namedPipeHelper.openNamedPipeOutputStream(processInPipeName, timeout); + if (commandStream != null) { + commandStream.close(); } - if (processOutPipeName != null) { - processOutStream = namedPipeHelper.openNamedPipeInputStream(processOutPipeName, timeout); + if (processInStream != null) { + processInStream.close(); } - if (restorePipeName != null) { - restoreStream = namedPipeHelper.openNamedPipeOutputStream(restorePipeName, timeout); + if (processOutStream != null) { + processOutStream.close(); } - if (persistPipeName != null) { - persistStream = namedPipeHelper.openNamedPipeInputStream(persistPipeName, timeout); + if (restoreStream != null) { + restoreStream.close(); + } + if (persistStream != null) { + persistStream.close(); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java index fa703e778c4..bc8c1c4e47a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java @@ -11,17 +11,21 @@ import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectBuilder; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; -import org.mockito.Mockito; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.time.Duration; import java.util.ArrayList; import java.util.List; import static org.mockito.Matchers.any; import static org.mockito.Matchers.contains; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ProcessPipesTests extends ESTestCase { @@ -34,7 +38,7 @@ public class ProcessPipesTests extends ESTestCase { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); Environment env = TestEnvironment.newEnvironment(settings); - NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); + NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class); when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))) .thenReturn(new ByteArrayInputStream(LOG_BYTES)); ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); @@ -89,4 +93,48 @@ public class ProcessPipesTests extends ESTestCase { assertEquals(6, processPipes.getPersistStream().get().read()); } + public void testCloseUnusedPipes_notConnected() throws IOException { + NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class); + Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); + Environment env = TestEnvironment.newEnvironment(settings); + + ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job", + true, true, true, true, true, true); + } + + public void testCloseOpenedPipesOnError() throws IOException { + + NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class); + InputStream logStream = mock(InputStream.class); + when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))) + .thenReturn(logStream); + OutputStream commandStream = mock(OutputStream.class); + when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))) + .thenReturn(commandStream); + OutputStream processInStream = mock(OutputStream.class); + when(namedPipeHelper.openNamedPipeOutputStream(contains("input"), any(Duration.class))) + .thenReturn(processInStream); + InputStream processOutStream = mock(InputStream.class); + when(namedPipeHelper.openNamedPipeInputStream(contains("output"), any(Duration.class))) + .thenReturn(processOutStream); + OutputStream restoreStream = mock(OutputStream.class); + when(namedPipeHelper.openNamedPipeOutputStream(contains("restore"), any(Duration.class))) + .thenReturn(restoreStream); + // opening this pipe will throw + when(namedPipeHelper.openNamedPipeInputStream(contains("persist"), any(Duration.class))).thenThrow(new IOException()); + + Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); + Environment env = TestEnvironment.newEnvironment(settings); + ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job", + true, true, true, true, true, true); + + expectThrows(IOException.class, () -> processPipes.connectStreams(Duration.ofSeconds(2))); + + // check the pipes successfully opened were then closed + verify(logStream, times(1)).close(); + verify(commandStream, times(1)).close(); + verify(processInStream, times(1)).close(); + verify(processOutStream, times(1)).close(); + verify(restoreStream, times(1)).close(); + } }