From 8487e1e0fe5b966c3d493c1e2dc5b5d1e6f8cd66 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 1 Aug 2017 15:53:58 +0100 Subject: [PATCH] [ML] Better handling of errors if native controller dies (elastic/x-pack-elasticsearch#2141) If the native controller dies or is killed then requests to open jobs now immediately return with an error that says what the problem is. The error that is logged also now clearly records the problem. Previously open job requests would time out if the native controller was not running, and logged errors were not easy to understand without in-depth knowledge of the ML code. relates elastic/x-pack-elasticsearch#2140 Original commit: elastic/x-pack-elasticsearch@fc7f074d4a2db05a9b11c1ad038490c87c72bfdf --- .../xpack/ml/action/OpenJobAction.java | 17 +++++-- .../ml/job/process/NativeController.java | 16 +++++++ .../ml/job/process/NativeControllerTests.java | 44 +++++++++++++------ 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 0469356daa2..be85e53d998 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -468,7 +468,11 @@ public class OpenJobAction extends Action() { @Override public void onResponse(PersistentTask persistentTask) { - listener.onResponse(new Response(predicate.opened)); + if (predicate.exception != null) { + listener.onFailure(predicate.exception); + } else { + listener.onResponse(new Response(predicate.opened)); + } } @Override @@ -520,9 +524,14 @@ public class OpenJobAction extends Action> { private volatile boolean opened; + private volatile Exception exception; @Override public boolean test(PersistentTask persistentTask) { @@ -539,12 +548,14 @@ public class OpenJobAction extends Action command) throws IOException { + if (command.isEmpty()) { + throw new IllegalArgumentException("Cannot start process: no command supplied"); + } + // Sanity check to avoid hard-to-debug errors - tabs and newlines will confuse the controller process for (String arg : command) { if (arg.contains("\t")) { @@ -111,6 +115,12 @@ public class NativeController { } } + if (cppLogHandler.hasLogStreamEnded()) { + String msg = "Cannot start process [" + command.get(0) + "]: native controller process has stopped"; + LOGGER.error(msg); + throw new ElasticsearchException(msg); + } + synchronized (commandStream) { LOGGER.debug("Starting process with command: " + command); commandStream.write(START_COMMAND.getBytes(StandardCharsets.UTF_8)); @@ -131,6 +141,12 @@ public class NativeController { throw new IllegalArgumentException("native controller will not kill self: " + pid); } + if (cppLogHandler.hasLogStreamEnded()) { + String msg = "Cannot kill process with PID [" + pid + "]: native controller process has stopped"; + LOGGER.error(msg); + throw new ElasticsearchException(msg); + } + synchronized (commandStream) { LOGGER.debug("Killing process with PID: " + pid); commandStream.write(KILL_COMMAND.getBytes(StandardCharsets.UTF_8)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java index 142b4fabcca..23abce52124 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.process; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; @@ -17,6 +18,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; @@ -27,17 +29,19 @@ import static org.mockito.Mockito.when; public class NativeControllerTests extends ESTestCase { + private static final String TEST_MESSAGE = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211," + + "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) " + + "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n"; + private Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); public void testStartProcessCommand() throws IOException { NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); ByteArrayInputStream logStream = new ByteArrayInputStream(new byte[1]); - when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))) - .thenReturn(logStream); + when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); - when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))) - .thenReturn(commandStream); + when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); List command = new ArrayList<>(); command.add("my_process"); @@ -54,17 +58,11 @@ public class NativeControllerTests extends ESTestCase { public void testGetNativeCodeInfo() throws IOException, TimeoutException { - String testMessage = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211," - + "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) " - + "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n"; - NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); - ByteArrayInputStream logStream = new ByteArrayInputStream(testMessage.getBytes(StandardCharsets.UTF_8)); - when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))) - .thenReturn(logStream); + ByteArrayInputStream logStream = new ByteArrayInputStream(TEST_MESSAGE.getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); - when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))) - .thenReturn(commandStream); + when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper); nativeController.tailLogsInThread(); @@ -75,4 +73,24 @@ public class NativeControllerTests extends ESTestCase { assertEquals("6.0.0-alpha1-SNAPSHOT", nativeCodeInfo.get("version")); assertEquals("a0d6ef8819418c", nativeCodeInfo.get("build_hash")); } + + public void testControllerDeath() throws Exception { + + NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); + ByteArrayInputStream logStream = new ByteArrayInputStream(TEST_MESSAGE.getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); + ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); + when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); + + NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper); + nativeController.tailLogsInThread(); + + // As soon as the log stream ends startProcess should think the native controller has died + assertBusy(() -> { + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> nativeController.startProcess(Collections.singletonList("my process"))); + + assertEquals("Cannot start process [my process]: native controller process has stopped", e.getMessage()); + }); + } }