[ML] Close any opened pipes if there is an error connecting to the process (#44869)

This commit is contained in:
David Kyle 2019-07-29 10:44:44 +01:00
parent 24873dd3e3
commit d05f12dadb
2 changed files with 92 additions and 14 deletions

View File

@ -127,23 +127,53 @@ public class ProcessPipes {
public void connectStreams(Duration timeout) throws IOException { public void connectStreams(Duration timeout) throws IOException {
// The order here is important. It must match the order that the C++ process tries to connect to the pipes, otherwise // The order here is important. It must match the order that the C++ process tries to connect to the pipes, otherwise
// a timeout is guaranteed. Also change api::CIoManager in the C++ code if changing the order here. // a timeout is guaranteed. Also change api::CIoManager in the C++ code if changing the order here.
if (logPipeName != null) { try {
logStream = namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout); if (logPipeName != null) {
logStream = namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout);
}
if (commandPipeName != null) {
commandStream = namedPipeHelper.openNamedPipeOutputStream(commandPipeName, timeout);
}
if (processInPipeName != null) {
processInStream = namedPipeHelper.openNamedPipeOutputStream(processInPipeName, timeout);
}
if (processOutPipeName != null) {
processOutStream = namedPipeHelper.openNamedPipeInputStream(processOutPipeName, timeout);
}
if (restorePipeName != null) {
restoreStream = namedPipeHelper.openNamedPipeOutputStream(restorePipeName, timeout);
}
if (persistPipeName != null) {
persistStream = namedPipeHelper.openNamedPipeInputStream(persistPipeName, timeout);
}
} catch (IOException ioe) {
try {
closeUnusedStreams();
} catch (IOException suppressed) {
ioe.addSuppressed(new IOException("Error closing process pipes", suppressed));
}
throw ioe;
} }
if (commandPipeName != null) { }
commandStream = namedPipeHelper.openNamedPipeOutputStream(commandPipeName, timeout);
private void closeUnusedStreams() throws IOException {
if (logStream != null) {
logStream.close();
} }
if (processInPipeName != null) { if (commandStream != null) {
processInStream = namedPipeHelper.openNamedPipeOutputStream(processInPipeName, timeout); commandStream.close();
} }
if (processOutPipeName != null) { if (processInStream != null) {
processOutStream = namedPipeHelper.openNamedPipeInputStream(processOutPipeName, timeout); processInStream.close();
} }
if (restorePipeName != null) { if (processOutStream != null) {
restoreStream = namedPipeHelper.openNamedPipeOutputStream(restorePipeName, timeout); processOutStream.close();
} }
if (persistPipeName != null) { if (restoreStream != null) {
persistStream = namedPipeHelper.openNamedPipeInputStream(persistPipeName, timeout); restoreStream.close();
}
if (persistStream != null) {
persistStream.close();
} }
} }

View File

@ -11,17 +11,21 @@ import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectBuilder;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.contains; import static org.mockito.Matchers.contains;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class ProcessPipesTests extends ESTestCase { public class ProcessPipesTests extends ESTestCase {
@ -34,7 +38,7 @@ public class ProcessPipesTests extends ESTestCase {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = TestEnvironment.newEnvironment(settings); Environment env = TestEnvironment.newEnvironment(settings);
NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class);
when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))) when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class)))
.thenReturn(new ByteArrayInputStream(LOG_BYTES)); .thenReturn(new ByteArrayInputStream(LOG_BYTES));
ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
@ -89,4 +93,48 @@ public class ProcessPipesTests extends ESTestCase {
assertEquals(6, processPipes.getPersistStream().get().read()); assertEquals(6, processPipes.getPersistStream().get().read());
} }
public void testCloseUnusedPipes_notConnected() throws IOException {
NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class);
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = TestEnvironment.newEnvironment(settings);
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job",
true, true, true, true, true, true);
}
public void testCloseOpenedPipesOnError() throws IOException {
NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class);
InputStream logStream = mock(InputStream.class);
when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class)))
.thenReturn(logStream);
OutputStream commandStream = mock(OutputStream.class);
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class)))
.thenReturn(commandStream);
OutputStream processInStream = mock(OutputStream.class);
when(namedPipeHelper.openNamedPipeOutputStream(contains("input"), any(Duration.class)))
.thenReturn(processInStream);
InputStream processOutStream = mock(InputStream.class);
when(namedPipeHelper.openNamedPipeInputStream(contains("output"), any(Duration.class)))
.thenReturn(processOutStream);
OutputStream restoreStream = mock(OutputStream.class);
when(namedPipeHelper.openNamedPipeOutputStream(contains("restore"), any(Duration.class)))
.thenReturn(restoreStream);
// opening this pipe will throw
when(namedPipeHelper.openNamedPipeInputStream(contains("persist"), any(Duration.class))).thenThrow(new IOException());
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = TestEnvironment.newEnvironment(settings);
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job",
true, true, true, true, true, true);
expectThrows(IOException.class, () -> processPipes.connectStreams(Duration.ofSeconds(2)));
// check the pipes successfully opened were then closed
verify(logStream, times(1)).close();
verify(commandStream, times(1)).close();
verify(processInStream, times(1)).close();
verify(processOutStream, times(1)).close();
verify(restoreStream, times(1)).close();
}
} }