[ML] Better handling of errors if native controller dies (elastic/x-pack-elasticsearch#2141)

If the native controller dies or is killed then requests to open jobs
now immediately return with an error that says what the problem is.
The error that is logged also now clearly records the problem.

Previously open job requests would time out if the native controller
was not running, and logged errors were not easy to understand without
in-depth knowledge of the ML code.

relates elastic/x-pack-elasticsearch#2140

Original commit: elastic/x-pack-elasticsearch@fc7f074d4a
This commit is contained in:
David Roberts 2017-08-01 15:53:58 +01:00 committed by GitHub
parent 7291eb55fe
commit 8487e1e0fe
3 changed files with 61 additions and 16 deletions

View File

@ -468,7 +468,11 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
new WaitForPersistentTaskStatusListener<JobParams>() { new WaitForPersistentTaskStatusListener<JobParams>() {
@Override @Override
public void onResponse(PersistentTask<JobParams> persistentTask) { public void onResponse(PersistentTask<JobParams> persistentTask) {
listener.onResponse(new Response(predicate.opened)); if (predicate.exception != null) {
listener.onFailure(predicate.exception);
} else {
listener.onResponse(new Response(predicate.opened));
}
} }
@Override @Override
@ -520,9 +524,14 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
} }
} }
/**
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
* of endpoints waiting for a condition tested by this predicate would never get a response.
*/
private class JobPredicate implements Predicate<PersistentTask<?>> { private class JobPredicate implements Predicate<PersistentTask<?>> {
private volatile boolean opened; private volatile boolean opened;
private volatile Exception exception;
@Override @Override
public boolean test(PersistentTask<?> persistentTask) { public boolean test(PersistentTask<?> persistentTask) {
@ -539,12 +548,14 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
opened = true; opened = true;
return true; return true;
case CLOSING: case CLOSING:
throw ExceptionsHelper.conflictStatusException("The job has been " + JobState.CLOSED + " while waiting to be " exception = ExceptionsHelper.conflictStatusException("The job has been " + JobState.CLOSED + " while waiting to be "
+ JobState.OPENED); + JobState.OPENED);
return true;
case FAILED: case FAILED:
default: default:
throw new IllegalStateException("Unexpected job state [" + jobState exception = ExceptionsHelper.serverError("Unexpected job state [" + jobState
+ "] while waiting for job to be " + JobState.OPENED); + "] while waiting for job to be " + JobState.OPENED);
return true;
} }
} }
} }

View File

@ -101,6 +101,10 @@ public class NativeController {
} }
public void startProcess(List<String> command) throws IOException { public void startProcess(List<String> command) throws IOException {
if (command.isEmpty()) {
throw new IllegalArgumentException("Cannot start process: no command supplied");
}
// Sanity check to avoid hard-to-debug errors - tabs and newlines will confuse the controller process // Sanity check to avoid hard-to-debug errors - tabs and newlines will confuse the controller process
for (String arg : command) { for (String arg : command) {
if (arg.contains("\t")) { if (arg.contains("\t")) {
@ -111,6 +115,12 @@ public class NativeController {
} }
} }
if (cppLogHandler.hasLogStreamEnded()) {
String msg = "Cannot start process [" + command.get(0) + "]: native controller process has stopped";
LOGGER.error(msg);
throw new ElasticsearchException(msg);
}
synchronized (commandStream) { synchronized (commandStream) {
LOGGER.debug("Starting process with command: " + command); LOGGER.debug("Starting process with command: " + command);
commandStream.write(START_COMMAND.getBytes(StandardCharsets.UTF_8)); commandStream.write(START_COMMAND.getBytes(StandardCharsets.UTF_8));
@ -131,6 +141,12 @@ public class NativeController {
throw new IllegalArgumentException("native controller will not kill self: " + pid); throw new IllegalArgumentException("native controller will not kill self: " + pid);
} }
if (cppLogHandler.hasLogStreamEnded()) {
String msg = "Cannot kill process with PID [" + pid + "]: native controller process has stopped";
LOGGER.error(msg);
throw new ElasticsearchException(msg);
}
synchronized (commandStream) { synchronized (commandStream) {
LOGGER.debug("Killing process with PID: " + pid); LOGGER.debug("Killing process with PID: " + pid);
commandStream.write(KILL_COMMAND.getBytes(StandardCharsets.UTF_8)); commandStream.write(KILL_COMMAND.getBytes(StandardCharsets.UTF_8));

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.job.process; package org.elasticsearch.xpack.ml.job.process;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -17,6 +18,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -27,17 +29,19 @@ import static org.mockito.Mockito.when;
public class NativeControllerTests extends ESTestCase { public class NativeControllerTests extends ESTestCase {
private static final String TEST_MESSAGE = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211,"
+ "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) "
+ "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n";
private Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); private Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
public void testStartProcessCommand() throws IOException { public void testStartProcessCommand() throws IOException {
NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class);
ByteArrayInputStream logStream = new ByteArrayInputStream(new byte[1]); ByteArrayInputStream logStream = new ByteArrayInputStream(new byte[1]);
when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))) when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream);
.thenReturn(logStream);
ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))) when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream);
.thenReturn(commandStream);
List<String> command = new ArrayList<>(); List<String> command = new ArrayList<>();
command.add("my_process"); command.add("my_process");
@ -54,17 +58,11 @@ public class NativeControllerTests extends ESTestCase {
public void testGetNativeCodeInfo() throws IOException, TimeoutException { public void testGetNativeCodeInfo() throws IOException, TimeoutException {
String testMessage = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211,"
+ "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) "
+ "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n";
NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class);
ByteArrayInputStream logStream = new ByteArrayInputStream(testMessage.getBytes(StandardCharsets.UTF_8)); ByteArrayInputStream logStream = new ByteArrayInputStream(TEST_MESSAGE.getBytes(StandardCharsets.UTF_8));
when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))) when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream);
.thenReturn(logStream);
ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))) when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream);
.thenReturn(commandStream);
NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper); NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper);
nativeController.tailLogsInThread(); nativeController.tailLogsInThread();
@ -75,4 +73,24 @@ public class NativeControllerTests extends ESTestCase {
assertEquals("6.0.0-alpha1-SNAPSHOT", nativeCodeInfo.get("version")); assertEquals("6.0.0-alpha1-SNAPSHOT", nativeCodeInfo.get("version"));
assertEquals("a0d6ef8819418c", nativeCodeInfo.get("build_hash")); assertEquals("a0d6ef8819418c", nativeCodeInfo.get("build_hash"));
} }
public void testControllerDeath() throws Exception {
NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class);
ByteArrayInputStream logStream = new ByteArrayInputStream(TEST_MESSAGE.getBytes(StandardCharsets.UTF_8));
when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream);
ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream);
NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper);
nativeController.tailLogsInThread();
// As soon as the log stream ends startProcess should think the native controller has died
assertBusy(() -> {
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> nativeController.startProcess(Collections.singletonList("my process")));
assertEquals("Cannot start process [my process]: native controller process has stopped", e.getMessage());
});
}
} }