This adds the node name where we fail to start a process via the native controller to facilitate debugging as otherwise it might not be known to which node the job was allocated.
This commit is contained in:
parent
770d8e9e39
commit
a6eb20ad35
|
@ -437,7 +437,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
|
|||
NormalizerProcessFactory normalizerProcessFactory;
|
||||
if (MachineLearningField.AUTODETECT_PROCESS.get(settings) && MachineLearningFeatureSet.isRunningOnMlPlatform(true)) {
|
||||
try {
|
||||
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
|
||||
NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), environment);
|
||||
if (nativeController == null) {
|
||||
// This will only only happen when path.home is not set, which is disallowed in production
|
||||
throw new ElasticsearchException("Failed to create native process controller for Machine Learning");
|
||||
|
|
|
@ -79,7 +79,8 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
|
|||
if (enabled && XPackPlugin.transportClientMode(environment.settings()) == false) {
|
||||
try {
|
||||
if (isRunningOnMlPlatform(true)) {
|
||||
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
|
||||
NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(),
|
||||
environment);
|
||||
if (nativeController != null) {
|
||||
nativeCodeInfo = nativeController.getNativeCodeInfo();
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import java.io.IOException;
|
|||
public class MlLifeCycleService {
|
||||
|
||||
private final Environment environment;
|
||||
private final ClusterService clusterService;
|
||||
private final DatafeedManager datafeedManager;
|
||||
private final AutodetectProcessManager autodetectProcessManager;
|
||||
private final MlMemoryTracker memoryTracker;
|
||||
|
@ -26,6 +27,7 @@ public class MlLifeCycleService {
|
|||
public MlLifeCycleService(Environment environment, ClusterService clusterService, DatafeedManager datafeedManager,
|
||||
AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) {
|
||||
this.environment = environment;
|
||||
this.clusterService = clusterService;
|
||||
this.datafeedManager = datafeedManager;
|
||||
this.autodetectProcessManager = autodetectProcessManager;
|
||||
this.memoryTracker = memoryTracker;
|
||||
|
@ -46,7 +48,7 @@ public class MlLifeCycleService {
|
|||
if (datafeedManager != null) {
|
||||
datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
|
||||
}
|
||||
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
|
||||
NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), environment);
|
||||
if (nativeController != null) {
|
||||
// This kills autodetect processes WITHOUT closing the jobs, so they get reallocated.
|
||||
if (autodetectProcessManager != null) {
|
||||
|
|
|
@ -52,15 +52,17 @@ public class NativeController {
|
|||
UNKNOWN_NATIVE_CODE_INFO = Collections.unmodifiableMap(unknownInfo);
|
||||
}
|
||||
|
||||
private final String localNodeName;
|
||||
private final CppLogMessageHandler cppLogHandler;
|
||||
private final OutputStream commandStream;
|
||||
|
||||
NativeController(Environment env, NamedPipeHelper namedPipeHelper) throws IOException {
|
||||
NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper) throws IOException {
|
||||
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER, null,
|
||||
true, true, false, false, false, false);
|
||||
processPipes.connectStreams(CONTROLLER_CONNECT_TIMEOUT);
|
||||
cppLogHandler = new CppLogMessageHandler(null, processPipes.getLogStream().get());
|
||||
commandStream = new BufferedOutputStream(processPipes.getCommandStream().get());
|
||||
this.localNodeName = localNodeName;
|
||||
this.cppLogHandler = new CppLogMessageHandler(null, processPipes.getLogStream().get());
|
||||
this.commandStream = new BufferedOutputStream(processPipes.getCommandStream().get());
|
||||
}
|
||||
|
||||
void tailLogsInThread() {
|
||||
|
@ -107,7 +109,8 @@ public class NativeController {
|
|||
}
|
||||
|
||||
if (cppLogHandler.hasLogStreamEnded()) {
|
||||
String msg = "Cannot start process [" + command.get(0) + "]: native controller process has stopped";
|
||||
String msg = "Cannot start process [" + command.get(0) + "]: native controller process has stopped on node ["
|
||||
+ localNodeName + "]";
|
||||
LOGGER.error(msg);
|
||||
throw new ElasticsearchException(msg);
|
||||
}
|
||||
|
@ -133,7 +136,8 @@ public class NativeController {
|
|||
}
|
||||
|
||||
if (cppLogHandler.hasLogStreamEnded()) {
|
||||
String msg = "Cannot kill process with PID [" + pid + "]: native controller process has stopped";
|
||||
String msg = "Cannot kill process with PID [" + pid + "]: native controller process has stopped on node ["
|
||||
+ localNodeName + "]";
|
||||
LOGGER.error(msg);
|
||||
throw new ElasticsearchException(msg);
|
||||
}
|
||||
|
|
|
@ -32,12 +32,12 @@ public class NativeControllerHolder {
|
|||
*
|
||||
* Calls may throw an exception if initial connection to the C++ process fails.
|
||||
*/
|
||||
public static NativeController getNativeController(Environment environment) throws IOException {
|
||||
public static NativeController getNativeController(String localNodeName, Environment environment) throws IOException {
|
||||
|
||||
if (MachineLearningField.AUTODETECT_PROCESS.get(environment.settings())) {
|
||||
synchronized (lock) {
|
||||
if (nativeController == null) {
|
||||
nativeController = new NativeController(environment, new NamedPipeHelper());
|
||||
nativeController = new NativeController(localNodeName, environment, new NamedPipeHelper());
|
||||
nativeController.tailLogsInThread();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
public class NativeControllerTests extends ESTestCase {
|
||||
|
||||
private static final String NODE_NAME = "native-controller-tests-node";
|
||||
|
||||
private static final String TEST_MESSAGE = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211,"
|
||||
+ "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) "
|
||||
+ "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n";
|
||||
|
@ -50,7 +52,7 @@ public class NativeControllerTests extends ESTestCase {
|
|||
command.add("--arg2=42");
|
||||
command.add("--arg3=something with spaces");
|
||||
|
||||
NativeController nativeController = new NativeController(TestEnvironment.newEnvironment(settings), namedPipeHelper);
|
||||
NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper);
|
||||
nativeController.startProcess(command);
|
||||
|
||||
assertEquals("start\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n",
|
||||
|
@ -65,7 +67,7 @@ public class NativeControllerTests extends ESTestCase {
|
|||
ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
|
||||
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream);
|
||||
|
||||
NativeController nativeController = new NativeController(TestEnvironment.newEnvironment(settings), namedPipeHelper);
|
||||
NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper);
|
||||
nativeController.tailLogsInThread();
|
||||
Map<String, Object> nativeCodeInfo = nativeController.getNativeCodeInfo();
|
||||
|
||||
|
@ -83,7 +85,7 @@ public class NativeControllerTests extends ESTestCase {
|
|||
ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
|
||||
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream);
|
||||
|
||||
NativeController nativeController = new NativeController(TestEnvironment.newEnvironment(settings), namedPipeHelper);
|
||||
NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper);
|
||||
nativeController.tailLogsInThread();
|
||||
|
||||
// As soon as the log stream ends startProcess should think the native controller has died
|
||||
|
@ -91,7 +93,8 @@ public class NativeControllerTests extends ESTestCase {
|
|||
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
||||
() -> nativeController.startProcess(Collections.singletonList("my process")));
|
||||
|
||||
assertEquals("Cannot start process [my process]: native controller process has stopped", e.getMessage());
|
||||
assertEquals("Cannot start process [my process]: native controller process has stopped on node " +
|
||||
"[native-controller-tests-node]", e.getMessage());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue