diff --git a/plugin/src/main/java/org/elasticsearch/license/XPackInfoResponse.java b/plugin/src/main/java/org/elasticsearch/license/XPackInfoResponse.java index 1945def2437..510af33f3c7 100644 --- a/plugin/src/main/java/org/elasticsearch/license/XPackInfoResponse.java +++ b/plugin/src/main/java/org/elasticsearch/license/XPackInfoResponse.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.license; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -230,16 +231,20 @@ public class XPackInfoResponse extends ActionResponse { @Nullable private final String description; private final boolean available; private final boolean enabled; + @Nullable private final Map nativeCodeInfo; public FeatureSet(StreamInput in) throws IOException { - this(in.readString(), in.readOptionalString(), in.readBoolean(), in.readBoolean()); + this(in.readString(), in.readOptionalString(), in.readBoolean(), in.readBoolean(), + in.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED) ? in.readMap() : null); } - public FeatureSet(String name, @Nullable String description, boolean available, boolean enabled) { + public FeatureSet(String name, @Nullable String description, boolean available, boolean enabled, + @Nullable Map nativeCodeInfo) { this.name = name; this.description = description; this.available = available; this.enabled = enabled; + this.nativeCodeInfo = nativeCodeInfo; } public String name() { @@ -259,6 +264,11 @@ public class XPackInfoResponse extends ActionResponse { return enabled; } + @Nullable + public Map nativeCodeInfo() { + return nativeCodeInfo; + } + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); if (description != null) { @@ -266,6 +276,9 @@ public class XPackInfoResponse extends ActionResponse { } builder.field("available", available); builder.field("enabled", enabled); + if (nativeCodeInfo != null) { + builder.field("native_code_info", nativeCodeInfo); + } return builder.endObject(); } @@ -274,6 +287,9 @@ public class XPackInfoResponse extends ActionResponse { out.writeOptionalString(description); out.writeBoolean(available); out.writeBoolean(enabled); + if (out.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) { + out.writeMap(nativeCodeInfo); + } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/XPackFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/XPackFeatureSet.java index 6995bf66d61..221f34fd44f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/XPackFeatureSet.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/XPackFeatureSet.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Map; public interface XPackFeatureSet { @@ -23,6 +24,8 @@ public interface XPackFeatureSet { boolean enabled(); + Map nativeCodeInfo(); + Usage usage(); abstract class Usage implements ToXContentObject, NamedWriteable { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/action/TransportXPackInfoAction.java b/plugin/src/main/java/org/elasticsearch/xpack/action/TransportXPackInfoAction.java index fe764a74b94..0cbb050b646 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/action/TransportXPackInfoAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/action/TransportXPackInfoAction.java @@ -59,7 +59,8 @@ public class TransportXPackInfoAction extends HandledTransportAction featureSets = this.featureSets.stream().map(fs -> - new FeatureSet(fs.name(), request.isVerbose() ? fs.description() : null, fs.available(), fs.enabled())) + new FeatureSet(fs.name(), request.isVerbose() ? fs.description() : null, fs.available(), fs.enabled(), + request.isVerbose() ? fs.nativeCodeInfo() : null)) .collect(Collectors.toSet()); featureSetsInfo = new XPackInfoResponse.FeatureSetsInfo(featureSets); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/graph/GraphFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/graph/GraphFeatureSet.java index 072440730ad..f4e74f179ab 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/graph/GraphFeatureSet.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/graph/GraphFeatureSet.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.graph; import java.io.IOException; +import java.util.Map; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; @@ -47,6 +48,11 @@ public class GraphFeatureSet implements XPackFeatureSet { return enabled; } + @Override + public Map nativeCodeInfo() { + return null; + } + @Override public XPackFeatureSet.Usage usage() { return new Usage(available(), enabled()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 0fc8b60a5bf..33c0a9a80ab 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -81,6 +81,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.NativeController; +import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; import org.elasticsearch.xpack.ml.job.process.ProcessCtrl; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessFactory; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -256,12 +257,17 @@ public class MachineLearning extends Plugin implements ActionPlugin { NormalizerProcessFactory normalizerProcessFactory; if (USE_NATIVE_PROCESS_OPTION.get(settings)) { try { - NativeController nativeController = new NativeController(env, new NamedPipeHelper()); - nativeController.tailLogsInThread(); + NativeController nativeController = NativeControllerHolder.getNativeController(settings); + if (nativeController == null) { + // This will only only happen when path.home is not set, which is disallowed in production + throw new ElasticsearchException("Failed to create native process controller for Machine Learning"); + } autodetectProcessFactory = new NativeAutodetectProcessFactory(jobProvider, env, settings, nativeController, client); normalizerProcessFactory = new NativeNormalizerProcessFactory(env, settings, nativeController); } catch (IOException e) { - throw new ElasticsearchException("Failed to create native process factories", e); + // This also should not happen in production, as the MachineLearningFeatureSet should have + // hit the same error first and brought down the node with a friendlier error message + throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e); } } else { autodetectProcessFactory = (jobDetails, modelSnapshot, quantiles, filters, ignoreDowntime, executorService) -> diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index 131825d81c6..11df1d24f0c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -5,26 +5,47 @@ */ package org.elasticsearch.xpack.ml; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.XPackFeatureSet; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; +import org.elasticsearch.xpack.ml.job.process.NativeController; +import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeoutException; public class MachineLearningFeatureSet implements XPackFeatureSet { private final boolean enabled; private final XPackLicenseState licenseState; + private final Map nativeCodeInfo; @Inject public MachineLearningFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState) { this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(settings); this.licenseState = licenseState; + Map nativeCodeInfo = NativeController.UNKNOWN_NATIVE_CODE_INFO; + try { + NativeController nativeController = NativeControllerHolder.getNativeController(settings); + if (nativeController != null) { + nativeCodeInfo = nativeController.getNativeCodeInfo(); + } + } catch (IOException | TimeoutException e) { + Loggers.getLogger(MachineLearningFeatureSet.class).error("Cannot get native code info for Machine Learning", e); + if (enabled) { + throw new ElasticsearchException("Cannot communicate with Machine Learning native code " + + "- please check that you are running on a supported platform"); + } + } + this.nativeCodeInfo = nativeCodeInfo; } @Override @@ -47,6 +68,11 @@ public class MachineLearningFeatureSet implements XPackFeatureSet { return enabled; } + @Override + public Map nativeCodeInfo() { + return nativeCodeInfo; + } + @Override public XPackFeatureSet.Usage usage() { return new Usage(available(), enabled()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeController.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeController.java index 62342124526..aca0a8f4ce7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeController.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeController.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.process; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler; @@ -15,8 +16,13 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** @@ -30,6 +36,15 @@ public class NativeController { private static final String START_COMMAND = "start"; + public static final Map UNKNOWN_NATIVE_CODE_INFO; + + static { + Map unknownInfo = new HashMap<>(2); + unknownInfo.put("version", "N/A"); + unknownInfo.put("build_hash", "N/A"); + UNKNOWN_NATIVE_CODE_INFO = Collections.unmodifiableMap(unknownInfo); + } + private final CppLogMessageHandler cppLogHandler; private final OutputStream commandStream; private Thread logTailThread; @@ -59,6 +74,23 @@ public class NativeController { return cppLogHandler.getPid(CONTROLLER_CONNECT_TIMEOUT); } + public Map getNativeCodeInfo() throws TimeoutException { + String copyrightMessage = cppLogHandler.getCppCopyright(CONTROLLER_CONNECT_TIMEOUT); + Matcher matcher = Pattern.compile("Version (.+) \\(Build ([0-9a-f]+)\\) Copyright ").matcher(copyrightMessage); + if (matcher.find()) { + Map info = new HashMap<>(2); + info.put("version", matcher.group(1)); + info.put("build_hash", matcher.group(2)); + return info; + } else { + // If this happens it probably means someone has changed the format in lib/ver/CBuildInfo.cc + // in the machine-learning-cpp repo without changing the pattern above to match + String msg = "Unexpected native controller process copyright format: " + copyrightMessage; + LOGGER.error(msg); + throw new ElasticsearchException(msg); + } + } + public void startProcess(List command) throws IOException { // Sanity check to avoid hard-to-debug errors - tabs and newlines will confuse the controller process for (String arg : command) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeControllerHolder.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeControllerHolder.java new file mode 100644 index 00000000000..b67c0a9fcc2 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeControllerHolder.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; + +import java.io.IOException; + +/** + * Manages a singleton NativeController so that both the MachineLearningFeatureSet and MachineLearning classes can + * get access to the same one. + */ +public class NativeControllerHolder { + + private static final Object lock = new Object(); + private static NativeController nativeController; + + private NativeControllerHolder() { + } + + /** + * Get a reference to the singleton native process controller. + * + * The NativeController is created lazily to allow time for the C++ process to be started before connection is attempted. + * + * null is returned to tests that haven't bothered to set up path.home and all runs where useNativeProcess=false. + * + * Calls may throw an exception if initial connection to the C++ process fails. + */ + public static NativeController getNativeController(Settings settings) throws IOException { + + if (Environment.PATH_HOME_SETTING.exists(settings) && MachineLearning.USE_NATIVE_PROCESS_OPTION.get(settings)) { + synchronized (lock) { + if (nativeController == null) { + nativeController = new NativeController(new Environment(settings), new NamedPipeHelper()); + nativeController.tailLogsInThread(); + } + } + return nativeController; + } + return null; + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java index ef78ce53d09..3b1756fcdac 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java @@ -47,6 +47,7 @@ public class CppLogMessageHandler implements Closeable { private final int errorStoreSize; private final Deque errorStore; private final CountDownLatch pidLatch; + private final CountDownLatch cppCopyrightLatch; private volatile boolean hasLogStreamEnded; private volatile boolean seenFatalError; private volatile long pid; @@ -70,6 +71,7 @@ public class CppLogMessageHandler implements Closeable { this.errorStoreSize = errorStoreSize; errorStore = ConcurrentCollections.newDeque(); pidLatch = new CountDownLatch(1); + cppCopyrightLatch = new CountDownLatch(1); hasLogStreamEnded = false; } @@ -133,7 +135,23 @@ public class CppLogMessageHandler implements Closeable { return pid; } - public String getCppCopyright() { + /** + * Get the process ID of the C++ process whose log messages are being read. This will + * arrive in the first log message logged by the C++ process. They all log a copyright + * message immediately on startup so it should not take long to arrive, but will not be + * available instantly after the process starts. + */ + public String getCppCopyright(Duration timeout) throws TimeoutException { + if (cppCopyright == null) { + try { + cppCopyrightLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (cppCopyright == null) { + throw new TimeoutException("Timed out waiting for C++ process copyright"); + } + } return cppCopyright; } @@ -193,6 +211,7 @@ public class CppLogMessageHandler implements Closeable { String latestMessage = msg.getMessage(); if (cppCopyright == null && latestMessage.contains("Copyright")) { cppCopyright = latestMessage; + cppCopyrightLatch.countDown(); } // TODO: Is there a way to preserve the original timestamp when re-logging? if (jobId != null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringFeatureSet.java index a3eaeddcdf2..0e6aa22a5c6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringFeatureSet.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/monitoring/MonitoringFeatureSet.java @@ -54,6 +54,11 @@ public class MonitoringFeatureSet implements XPackFeatureSet { return enabled; } + @Override + public Map nativeCodeInfo() { + return null; + } + @Override public XPackFeatureSet.Usage usage() { return new Usage(available(), enabled(), exportersUsage(exporters)); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java index 8ba96c5c166..44d531d4287 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityFeatureSet.java @@ -83,6 +83,11 @@ public class SecurityFeatureSet implements XPackFeatureSet { return enabled; } + @Override + public Map nativeCodeInfo() { + return null; + } + @Override public XPackFeatureSet.Usage usage() { Map realmsUsage = buildRealmsUsage(realms); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherFeatureSet.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherFeatureSet.java index cb476a82f68..e7468c2a4ae 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherFeatureSet.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherFeatureSet.java @@ -53,6 +53,11 @@ public class WatcherFeatureSet implements XPackFeatureSet { return enabled; } + @Override + public Map nativeCodeInfo() { + return null; + } + @Override public XPackFeatureSet.Usage usage() { return new Usage(available(), enabled(), watcherService != null ? watcherService.usageStats() : Collections.emptyMap()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java index 211d995fd2d..142b4fabcca 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java @@ -18,6 +18,8 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; import static org.mockito.Matchers.any; import static org.mockito.Matchers.contains; @@ -25,9 +27,9 @@ import static org.mockito.Mockito.when; public class NativeControllerTests extends ESTestCase { - public void testNativeController() throws IOException { - Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); - Environment env = new Environment(settings); + private Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); + + public void testStartProcessCommand() throws IOException { NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); ByteArrayInputStream logStream = new ByteArrayInputStream(new byte[1]); @@ -43,10 +45,34 @@ public class NativeControllerTests extends ESTestCase { command.add("--arg2=42"); command.add("--arg3=something with spaces"); - NativeController nativeController = new NativeController(env, namedPipeHelper); + NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper); nativeController.startProcess(command); assertEquals("start\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n", commandStream.toString(StandardCharsets.UTF_8.name())); } + + public void testGetNativeCodeInfo() throws IOException, TimeoutException { + + String testMessage = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211," + + "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) " + + "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n"; + + NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class); + ByteArrayInputStream logStream = new ByteArrayInputStream(testMessage.getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))) + .thenReturn(logStream); + ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); + when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))) + .thenReturn(commandStream); + + NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper); + nativeController.tailLogsInThread(); + Map nativeCodeInfo = nativeController.getNativeCodeInfo(); + + assertNotNull(nativeCodeInfo); + assertEquals(2, nativeCodeInfo.size()); + assertEquals("6.0.0-alpha1-SNAPSHOT", nativeCodeInfo.get("version")); + assertEquals("a0d6ef8819418c", nativeCodeInfo.get("build_hash")); + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java index 37d889a14d2..3e7ce2a5d41 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java @@ -42,9 +42,11 @@ public class CppLogMessageHandlerTests extends ESTestCase { handler.tailStream(); assertTrue(handler.hasLogStreamEnded()); - assertEquals(10211L, handler.getPid(Duration.ofMillis(1))); + // Since this is all being done in one thread and we know the stream has + // been completely consumed at this point the wait duration can be zero + assertEquals(10211L, handler.getPid(Duration.ZERO)); assertEquals("controller (64 bit): Version based on 6.0.0-alpha1 (Build b0d6ef8819418c) " - + "Copyright (c) 2017 Elasticsearch BV", handler.getCppCopyright()); + + "Copyright (c) 2017 Elasticsearch BV", handler.getCppCopyright(Duration.ZERO)); assertEquals("Did not understand verb 'a'\n", handler.getErrors()); assertFalse(handler.seenFatalError()); }