diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 0469356daa2..be85e53d998 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -468,7 +468,11 @@ public class OpenJobAction extends Action() { @Override public void onResponse(PersistentTask persistentTask) { - listener.onResponse(new Response(predicate.opened)); + if (predicate.exception != null) { + listener.onFailure(predicate.exception); + } else { + listener.onResponse(new Response(predicate.opened)); + } } @Override @@ -520,9 +524,14 @@ public class OpenJobAction extends Action> { private volatile boolean opened; + private volatile Exception exception; @Override public boolean test(PersistentTask persistentTask) { @@ -539,12 +548,14 @@ public class OpenJobAction extends Action command) throws IOException { + if (command.isEmpty()) { + throw new IllegalArgumentException("Cannot start process: no command supplied"); + } + // Sanity check to avoid hard-to-debug errors - tabs and newlines will confuse the controller process for (String arg : command) { if (arg.contains("\t")) { @@ -111,6 +115,12 @@ public class NativeController { } } + if (cppLogHandler.hasLogStreamEnded()) { + String msg = "Cannot start process [" + command.get(0) + "]: native controller process has stopped"; + LOGGER.error(msg); + throw new ElasticsearchException(msg); + } + synchronized (commandStream) { LOGGER.debug("Starting process with command: " + command); commandStream.write(START_COMMAND.getBytes(StandardCharsets.UTF_8)); @@ -131,6 +141,12 @@ public class NativeController { throw new IllegalArgumentException("native controller will not kill self: " + pid); } + if (cppLogHandler.hasLogStreamEnded()) { + String msg = "Cannot kill process with PID [" + pid + "]: native controller process has stopped"; + LOGGER.error(msg); + throw new ElasticsearchException(msg); + } + synchronized (commandStream) { LOGGER.debug("Killing process with PID: " + pid); commandStream.write(KILL_COMMAND.getBytes(StandardCharsets.UTF_8)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java index 142b4fabcca..23abce52124 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.process; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; @@ -17,6 +18,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; @@ -27,17 +29,19 @@ import static org.mockito.Mockito.when; public class NativeControllerTests extends ESTestCase { + private static final String TEST_MESSAGE = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211," + + "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) " + + "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n"; + private Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); public void testStartProcessCommand() throws IOException { NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); ByteArrayInputStream logStream = new ByteArrayInputStream(new byte[1]); - when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))) - .thenReturn(logStream); + when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); - when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))) - .thenReturn(commandStream); + when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); List command = new ArrayList<>(); command.add("my_process"); @@ -54,17 +58,11 @@ public class NativeControllerTests extends ESTestCase { public void testGetNativeCodeInfo() throws IOException, TimeoutException { - String testMessage = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211," - + "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) " - + "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n"; - NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); - ByteArrayInputStream logStream = new ByteArrayInputStream(testMessage.getBytes(StandardCharsets.UTF_8)); - when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))) - .thenReturn(logStream); + ByteArrayInputStream logStream = new ByteArrayInputStream(TEST_MESSAGE.getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); - when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))) - .thenReturn(commandStream); + when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper); nativeController.tailLogsInThread(); @@ -75,4 +73,24 @@ public class NativeControllerTests extends ESTestCase { assertEquals("6.0.0-alpha1-SNAPSHOT", nativeCodeInfo.get("version")); assertEquals("a0d6ef8819418c", nativeCodeInfo.get("build_hash")); } + + public void testControllerDeath() throws Exception { + + NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); + ByteArrayInputStream logStream = new ByteArrayInputStream(TEST_MESSAGE.getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); + ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); + when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); + + NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper); + nativeController.tailLogsInThread(); + + // As soon as the log stream ends startProcess should think the native controller has died + assertBusy(() -> { + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> nativeController.startProcess(Collections.singletonList("my process"))); + + assertEquals("Cannot start process [my process]: native controller process has stopped", e.getMessage()); + }); + } }