diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 861eea809ca..245f34387ef 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -437,7 +437,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu NormalizerProcessFactory normalizerProcessFactory; if (MachineLearningField.AUTODETECT_PROCESS.get(settings) && MachineLearningFeatureSet.isRunningOnMlPlatform(true)) { try { - NativeController nativeController = NativeControllerHolder.getNativeController(environment); + NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), environment); if (nativeController == null) { // This will only only happen when path.home is not set, which is disallowed in production throw new ElasticsearchException("Failed to create native process controller for Machine Learning"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index c913babbaa4..bcfab50c21e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -79,7 +79,8 @@ public class MachineLearningFeatureSet implements XPackFeatureSet { if (enabled && XPackPlugin.transportClientMode(environment.settings()) == false) { try { if (isRunningOnMlPlatform(true)) { - NativeController nativeController = NativeControllerHolder.getNativeController(environment); + NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), + environment); if (nativeController != null) { nativeCodeInfo = nativeController.getNativeCodeInfo(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java index 06d9b749e1a..7309afa6b3a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java @@ -19,6 +19,7 @@ import java.io.IOException; public class MlLifeCycleService { private final Environment environment; + private final ClusterService clusterService; private final DatafeedManager datafeedManager; private final AutodetectProcessManager autodetectProcessManager; private final MlMemoryTracker memoryTracker; @@ -26,6 +27,7 @@ public class MlLifeCycleService { public MlLifeCycleService(Environment environment, ClusterService clusterService, DatafeedManager datafeedManager, AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) { this.environment = environment; + this.clusterService = clusterService; this.datafeedManager = datafeedManager; this.autodetectProcessManager = autodetectProcessManager; this.memoryTracker = memoryTracker; @@ -46,7 +48,7 @@ public class MlLifeCycleService { if (datafeedManager != null) { datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown(); } - NativeController nativeController = NativeControllerHolder.getNativeController(environment); + NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), environment); if (nativeController != null) { // This kills autodetect processes WITHOUT closing the jobs, so they get reallocated. if (autodetectProcessManager != null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java index 721e07721e3..1be410741c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java @@ -52,15 +52,17 @@ public class NativeController { UNKNOWN_NATIVE_CODE_INFO = Collections.unmodifiableMap(unknownInfo); } + private final String localNodeName; private final CppLogMessageHandler cppLogHandler; private final OutputStream commandStream; - NativeController(Environment env, NamedPipeHelper namedPipeHelper) throws IOException { + NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper) throws IOException { ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER, null, true, true, false, false, false, false); processPipes.connectStreams(CONTROLLER_CONNECT_TIMEOUT); - cppLogHandler = new CppLogMessageHandler(null, processPipes.getLogStream().get()); - commandStream = new BufferedOutputStream(processPipes.getCommandStream().get()); + this.localNodeName = localNodeName; + this.cppLogHandler = new CppLogMessageHandler(null, processPipes.getLogStream().get()); + this.commandStream = new BufferedOutputStream(processPipes.getCommandStream().get()); } void tailLogsInThread() { @@ -107,7 +109,8 @@ public class NativeController { } if (cppLogHandler.hasLogStreamEnded()) { - String msg = "Cannot start process [" + command.get(0) + "]: native controller process has stopped"; + String msg = "Cannot start process [" + command.get(0) + "]: native controller process has stopped on node [" + + localNodeName + "]"; LOGGER.error(msg); throw new ElasticsearchException(msg); } @@ -133,7 +136,8 @@ public class NativeController { } if (cppLogHandler.hasLogStreamEnded()) { - String msg = "Cannot kill process with PID [" + pid + "]: native controller process has stopped"; + String msg = "Cannot kill process with PID [" + pid + "]: native controller process has stopped on node [" + + localNodeName + "]"; LOGGER.error(msg); throw new ElasticsearchException(msg); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeControllerHolder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeControllerHolder.java index 67e24b44a84..5365a11f6b5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeControllerHolder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeControllerHolder.java @@ -32,12 +32,12 @@ public class NativeControllerHolder { * * Calls may throw an exception if initial connection to the C++ process fails. */ - public static NativeController getNativeController(Environment environment) throws IOException { + public static NativeController getNativeController(String localNodeName, Environment environment) throws IOException { if (MachineLearningField.AUTODETECT_PROCESS.get(environment.settings())) { synchronized (lock) { if (nativeController == null) { - nativeController = new NativeController(environment, new NamedPipeHelper()); + nativeController = new NativeController(localNodeName, environment, new NamedPipeHelper()); nativeController.tailLogsInThread(); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java index ac00e8a24e1..c799f142359 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java @@ -30,6 +30,8 @@ import static org.mockito.Mockito.when; public class NativeControllerTests extends ESTestCase { + private static final String NODE_NAME = "native-controller-tests-node"; + private static final String TEST_MESSAGE = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211," + "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) " + "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n"; @@ -50,7 +52,7 @@ public class NativeControllerTests extends ESTestCase { command.add("--arg2=42"); command.add("--arg3=something with spaces"); - NativeController nativeController = new NativeController(TestEnvironment.newEnvironment(settings), namedPipeHelper); + NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper); nativeController.startProcess(command); assertEquals("start\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n", @@ -65,7 +67,7 @@ public class NativeControllerTests extends ESTestCase { ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); - NativeController nativeController = new NativeController(TestEnvironment.newEnvironment(settings), namedPipeHelper); + NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper); nativeController.tailLogsInThread(); Map nativeCodeInfo = nativeController.getNativeCodeInfo(); @@ -83,7 +85,7 @@ public class NativeControllerTests extends ESTestCase { ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); - NativeController nativeController = new NativeController(TestEnvironment.newEnvironment(settings), namedPipeHelper); + NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper); nativeController.tailLogsInThread(); // As soon as the log stream ends startProcess should think the native controller has died @@ -91,7 +93,8 @@ public class NativeControllerTests extends ESTestCase { ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> nativeController.startProcess(Collections.singletonList("my process"))); - assertEquals("Cannot start process [my process]: native controller process has stopped", e.getMessage()); + assertEquals("Cannot start process [my process]: native controller process has stopped on node " + + "[native-controller-tests-node]", e.getMessage()); }); } }